Jackie-Jiang commented on code in PR #17532:
URL: https://github.com/apache/pinot/pull/17532#discussion_r2791163442


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java:
##########
@@ -1248,6 +1324,56 @@ boolean isDisabled() {
       return _disabled;
     }
 
+    private void updateSamplerSegments(Set<String> preSelectedOnlineSegments) {
+      if (_samplers.isEmpty()) {
+        _samplerSegments = Collections.emptyMap();
+        return;
+      }
+      Set<String> readOnlyPreSelectedOnlineSegments =
+          Collections.unmodifiableSet(new 
HashSet<>(preSelectedOnlineSegments));
+      Map<String, Set<String>> samplerSegments = new HashMap<>();
+      for (Map.Entry<String, TableSampler> entry : _samplers.entrySet()) {
+        String samplerName = entry.getKey();
+        try {
+          Set<String> sampledSegments = 
entry.getValue().sampleSegments(readOnlyPreSelectedOnlineSegments);
+          if (sampledSegments != null) {
+            samplerSegments.put(samplerName, Set.copyOf(sampledSegments));
+          } else {
+            LOGGER.warn("Sampler '{}' returned null segments for table '{}', 
treating it as empty", samplerName,
+                _tableNameWithType);
+          }
+        } catch (Exception e) {
+          LOGGER.error("Caught unexpected exception while sampling segments 
for sampler: {} for table: {}", samplerName,
+              _tableNameWithType, e);
+        }
+      }
+      _samplerSegments = samplerSegments.isEmpty() ? Collections.emptyMap() : 
Map.copyOf(samplerSegments);
+    }
+
+    @Nullable
+    private Set<String> getSampledSegments(@Nullable String samplerName) {
+      if (StringUtils.isNotBlank(samplerName)) {
+        String normalizedSamplerName = normalizeSamplerName(samplerName);
+        Set<String> sampledSegments = 
_samplerSegments.get(normalizedSamplerName);
+        if (sampledSegments != null) {
+          return sampledSegments;
+        }
+        LOGGER.warn("Requested sampler '{}' not found for table '{}'; falling 
back to default routing entry",
+            samplerName, _tableNameWithType);
+      }
+      return null;
+    }
+
+    private Set<String> maybeApplySampler(Set<String> selectedSegments, 
@Nullable String samplerName) {
+      Set<String> sampledSegments = getSampledSegments(samplerName);
+      if (sampledSegments != null && !selectedSegments.isEmpty()) {
+        Set<String> sampledSelectedSegments = new HashSet<>(selectedSegments);
+        sampledSelectedSegments.retainAll(sampledSegments);

Review Comment:
   This is expensive. For each sampler, we should have a dedicate 
`SegmentSelector`. See the previous comment about `SamplerInfo` wrapper class



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java:
##########
@@ -1210,7 +1283,10 @@ private static class RoutingEntry {
       _timeBoundaryManager = timeBoundaryManager;
       _partitionMetadataManager = partitionMetadataManager;
       _queryTimeoutMs = queryTimeoutMs;
+      _samplers = samplers.isEmpty() ? Collections.emptyMap() : 
Map.copyOf(samplers);

Review Comment:
   (nit) Use `Map.of()` for empty map; No need to copy a map



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java:
##########
@@ -810,10 +820,44 @@ private void buildRoutingInternal(String 
tableNameWithType) {
       }
       segmentZkMetadataFetcher.init(idealState, externalView, 
preSelectedOnlineSegments);
 
+      // Build table samplers keyed by normalized sampler name.
+      Map<String, TableSampler> samplers = Collections.emptyMap();
+      List<TableSamplerConfig> tableSamplerConfigs = 
tableConfig.getTableSamplers();
+      if (CollectionUtils.isNotEmpty(tableSamplerConfigs)) {
+        Map<String, TableSampler> configuredSamplers = new HashMap<>();
+        for (TableSamplerConfig samplerConfig : tableSamplerConfigs) {
+          String samplerName = samplerConfig.getName();
+          String samplerType = samplerConfig.getType();
+          if (StringUtils.isBlank(samplerName) || 
StringUtils.isBlank(samplerType)) {
+            LOGGER.warn("Skipping invalid table sampler config for table: {}, 
samplerName: {}, samplerType: {}",
+                tableNameWithType, samplerName, samplerType);
+            continue;
+          }
+          String normalizedSamplerName = normalizeSamplerName(samplerName);
+          if (configuredSamplers.containsKey(normalizedSamplerName)) {
+            LOGGER.warn("Skipping duplicate normalized table sampler name: 
'{}' for table: {}", samplerName,
+                tableNameWithType);
+            continue;
+          }
+          try {
+            TableSampler sampler = TableSamplerFactory.create(samplerType);
+            sampler.init(tableConfig, samplerConfig, _propertyStore);
+            configuredSamplers.put(normalizedSamplerName, sampler);
+          } catch (Exception e) {
+            LOGGER.error("Caught unexpected exception while building routing 
for table sampler: {} for table: {}",
+                samplerName, tableNameWithType, e);
+          }
+        }
+        if (!configuredSamplers.isEmpty()) {
+          samplers = Map.copyOf(configuredSamplers);

Review Comment:
   (minor) Seems you are making multiple copies. I don't think you need to make 
any copy. You can create a pre-sized hash map



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java:
##########
@@ -1086,9 +1132,34 @@ private Map<ServerInstance, SegmentsToQuery> 
getServerInstanceToSegmentsMap(Stri
   @Nullable
   @Override
   public List<String> getSegments(BrokerRequest brokerRequest) {
+    return getSegments(brokerRequest, extractSamplerName(brokerRequest));
+  }
+
+  @Nullable
+  @Override
+  public List<String> getSegments(BrokerRequest brokerRequest, @Nullable 
String samplerName) {
     String tableNameWithType = brokerRequest.getQuerySource().getTableName();
     RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
-    return routingEntry != null ? routingEntry.getSegments(brokerRequest) : 
null;
+    if (routingEntry == null) {
+      return null;
+    }
+    return routingEntry.getSegments(brokerRequest, samplerName);
+  }
+
+  private static String normalizeSamplerName(String samplerName) {
+    return samplerName.trim().toLowerCase(Locale.ROOT);
+  }
+
+  @Nullable
+  static String extractSamplerName(BrokerRequest brokerRequest) {
+    if (!brokerRequest.isSetPinotQuery()) {
+      return null;
+    }
+    PinotQuery pinotQuery = brokerRequest.getPinotQuery();
+    if (!pinotQuery.isSetQueryOptions()) {
+      return null;
+    }

Review Comment:
   Both the checks can be removed



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java:
##########
@@ -1248,6 +1324,56 @@ boolean isDisabled() {
       return _disabled;
     }
 
+    private void updateSamplerSegments(Set<String> preSelectedOnlineSegments) {
+      if (_samplers.isEmpty()) {
+        _samplerSegments = Collections.emptyMap();
+        return;
+      }
+      Set<String> readOnlyPreSelectedOnlineSegments =
+          Collections.unmodifiableSet(new 
HashSet<>(preSelectedOnlineSegments));
+      Map<String, Set<String>> samplerSegments = new HashMap<>();
+      for (Map.Entry<String, TableSampler> entry : _samplers.entrySet()) {
+        String samplerName = entry.getKey();
+        try {
+          Set<String> sampledSegments = 
entry.getValue().sampleSegments(readOnlyPreSelectedOnlineSegments);
+          if (sampledSegments != null) {
+            samplerSegments.put(samplerName, Set.copyOf(sampledSegments));
+          } else {
+            LOGGER.warn("Sampler '{}' returned null segments for table '{}', 
treating it as empty", samplerName,
+                _tableNameWithType);
+          }
+        } catch (Exception e) {
+          LOGGER.error("Caught unexpected exception while sampling segments 
for sampler: {} for table: {}", samplerName,
+              _tableNameWithType, e);
+        }
+      }
+      _samplerSegments = samplerSegments.isEmpty() ? Collections.emptyMap() : 
Map.copyOf(samplerSegments);

Review Comment:
   Reduce redundant copy



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java:
##########
@@ -1086,9 +1132,34 @@ private Map<ServerInstance, SegmentsToQuery> 
getServerInstanceToSegmentsMap(Stri
   @Nullable
   @Override
   public List<String> getSegments(BrokerRequest brokerRequest) {
+    return getSegments(brokerRequest, extractSamplerName(brokerRequest));
+  }
+
+  @Nullable
+  @Override
+  public List<String> getSegments(BrokerRequest brokerRequest, @Nullable 
String samplerName) {
     String tableNameWithType = brokerRequest.getQuerySource().getTableName();
     RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
-    return routingEntry != null ? routingEntry.getSegments(brokerRequest) : 
null;
+    if (routingEntry == null) {
+      return null;
+    }
+    return routingEntry.getSegments(brokerRequest, samplerName);
+  }
+
+  private static String normalizeSamplerName(String samplerName) {
+    return samplerName.trim().toLowerCase(Locale.ROOT);
+  }
+
+  @Nullable
+  static String extractSamplerName(BrokerRequest brokerRequest) {
+    if (!brokerRequest.isSetPinotQuery()) {
+      return null;
+    }
+    PinotQuery pinotQuery = brokerRequest.getPinotQuery();
+    if (!pinotQuery.isSetQueryOptions()) {
+      return null;
+    }
+    return 
pinotQuery.getQueryOptions().get(CommonConstants.Broker.Request.QueryOptionKey.TABLE_SAMPLER);

Review Comment:
   Move this into `QueryOptionsUtils`



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java:
##########
@@ -1248,6 +1324,56 @@ boolean isDisabled() {
       return _disabled;
     }
 
+    private void updateSamplerSegments(Set<String> preSelectedOnlineSegments) {
+      if (_samplers.isEmpty()) {
+        _samplerSegments = Collections.emptyMap();
+        return;
+      }
+      Set<String> readOnlyPreSelectedOnlineSegments =
+          Collections.unmodifiableSet(new 
HashSet<>(preSelectedOnlineSegments));

Review Comment:
   Reduce copies. Just make the contract that sampler shouldn't modify the input



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java:
##########
@@ -1044,11 +1088,13 @@ public RoutingTable getRoutingTable(BrokerRequest 
brokerRequest, long requestId)
   @Nullable
   @Override
   public RoutingTable getRoutingTable(BrokerRequest brokerRequest, String 
tableNameWithType, long requestId) {
+    String samplerName = extractSamplerName(brokerRequest);

Review Comment:
   (minor) Move this after reading `RoutingEntry`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to