Jackie-Jiang commented on code in PR #17532:
URL: https://github.com/apache/pinot/pull/17532#discussion_r2784473354


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java:
##########
@@ -351,6 +359,14 @@ private boolean processAssignmentChangeForTable(int 
idealStateVersion, int exter
           return true;
         }
         routingEntry.onAssignmentChange(idealState, externalView);
+
+        // Handle sampler routing entries
+        Map<String, RoutingEntry> samplerEntries = 
_samplerRoutingEntryMap.get(tableNameWithType);
+        if (samplerEntries != null && !samplerEntries.isEmpty()) {

Review Comment:
   (nit) `samplerEntries` can never be empty



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java:
##########
@@ -812,14 +852,74 @@ private void buildRoutingInternal(String 
tableNameWithType) {
 
       RoutingEntry routingEntry =
           new RoutingEntry(tableNameWithType, idealStatePath, 
externalViewPath, segmentPreSelector, segmentSelector,
-              segmentPruners, instanceSelector, idealStateVersion, 
externalViewVersion, segmentZkMetadataFetcher,
+              null, segmentPruners, instanceSelector, idealStateVersion, 
externalViewVersion, segmentZkMetadataFetcher,
               timeBoundaryManager, partitionMetadataManager, queryTimeoutMs, 
!idealState.isEnabled());
       if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) {
         LOGGER.info("Built routing for table: {}", tableNameWithType);
       } else {
         LOGGER.info("Rebuilt routing for table: {}", tableNameWithType);
       }
 
+      // Build routing entries for configured table samplers (if any).
+      // These entries use the same underlying routing components, but operate 
on a deterministically sampled set of
+      // segments selected during routing build.
+      List<TableSamplerConfig> tableSamplerConfigs = 
tableConfig.getTableSamplers();
+      if (CollectionUtils.isNotEmpty(tableSamplerConfigs)) {
+        Map<String, RoutingEntry> samplerRoutingEntries = new HashMap<>();
+        for (TableSamplerConfig samplerConfig : tableSamplerConfigs) {
+          String samplerName = samplerConfig.getName();
+          String samplerType = samplerConfig.getType();
+          if (StringUtils.isBlank(samplerName) || 
StringUtils.isBlank(samplerType)) {
+            LOGGER.warn("Skipping invalid table sampler config for table: {}, 
samplerName: {}, samplerType: {}",
+                tableNameWithType, samplerName, samplerType);
+            continue;
+          }
+          try {
+            TableSampler sampler = TableSamplerFactory.create(samplerType);
+            sampler.init(tableConfig, samplerConfig, _propertyStore);
+
+            // Apply sampler after base pre-selection (e.g. segment lineage).
+            Set<String> samplerPreSelectedOnlineSegments =
+                sampler.selectSegments(new 
HashSet<>(preSelectedOnlineSegments));
+            SegmentSelector samplerSegmentSelector = 
SegmentSelectorFactory.getSegmentSelector(tableConfig);
+            samplerSegmentSelector.init(idealState, externalView, 
samplerPreSelectedOnlineSegments);
+
+            List<SegmentPruner> samplerSegmentPruners =
+                SegmentPrunerFactory.getSegmentPruners(tableConfig, 
_propertyStore);
+            AdaptiveServerSelector samplerAdaptiveServerSelector =
+                
AdaptiveServerSelectorFactory.getAdaptiveServerSelector(_serverRoutingStatsManager,
 _pinotConfig);
+            InstanceSelector samplerInstanceSelector =
+                InstanceSelectorFactory.getInstanceSelector(tableConfig, 
_propertyStore, _brokerMetrics,
+                    samplerAdaptiveServerSelector, _pinotConfig, 
_routableServers, _enabledServerInstanceMap,
+                    idealState, externalView, 
samplerPreSelectedOnlineSegments);
+
+            SegmentZkMetadataFetcher samplerSegmentZkMetadataFetcher =
+                new SegmentZkMetadataFetcher(tableNameWithType, 
_propertyStore);
+            for (SegmentZkMetadataFetchListener listener : 
samplerSegmentPruners) {
+              samplerSegmentZkMetadataFetcher.register(listener);
+            }
+            samplerSegmentZkMetadataFetcher.init(idealState, externalView, 
samplerPreSelectedOnlineSegments);
+
+            RoutingEntry samplerRoutingEntry =
+                new RoutingEntry(tableNameWithType, idealStatePath, 
externalViewPath, segmentPreSelector,
+                    samplerSegmentSelector, sampler, samplerSegmentPruners, 
samplerInstanceSelector, idealStateVersion,
+                    externalViewVersion, samplerSegmentZkMetadataFetcher, 
null, null, queryTimeoutMs,

Review Comment:
   Seems time boundary and partitioning is not handled with sampler?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java:
##########
@@ -1177,6 +1311,8 @@ private static class RoutingEntry {
     final String _idealStatePath;
     final String _externalViewPath;
     final SegmentPreSelector _segmentPreSelector;
+    @Nullable
+    final TableSampler _tableSampler;

Review Comment:
   Most of the content in the sampler can be shared with the main entry. I 
think an easier way to handle sampling is to add a SamplerInfo class for 
sampler specific context (probably only `TableSampler`, `SegmentSelector` and 
`InstanceSelector`). This way most of the repeated work can be avoided, and 
time boundary and partitioning info can be automatically supported.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/TableSampler.java:
##########
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.tablesampler;
+
+import java.util.Set;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
+
+
+/**
+ * A {@code TableSampler} deterministically selects a subset of segments from 
the set of online segments for a table.
+ *
+ * <p>Selection is performed during routing table build/update so there is no 
additional per-query overhead beyond
+ * selecting the pre-built routing entry.
+ */
+public interface TableSampler {
+
+  /**
+   * Initializes the sampler for a specific table and sampler config.
+   */
+  void init(TableConfig tableConfig, TableSamplerConfig samplerConfig, 
ZkHelixPropertyStore<ZNRecord> propertyStore);
+
+  /**
+   * Selects a subset of segments from the provided online segments.
+   */
+  Set<String> selectSegments(Set<String> onlineSegments);

Review Comment:
   ```suggestion
     Set<String> sampleSegments(Set<String> onlineSegments);
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/MultiClusterRoutingManager.java:
##########


Review Comment:
   Avoid the repeated logic of parsing sampler name
   ```suggestion
       return getSegments(brokerRequest, extractSamplerName(brokerRequest));
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java:
##########
@@ -812,14 +852,74 @@ private void buildRoutingInternal(String 
tableNameWithType) {
 
       RoutingEntry routingEntry =
           new RoutingEntry(tableNameWithType, idealStatePath, 
externalViewPath, segmentPreSelector, segmentSelector,
-              segmentPruners, instanceSelector, idealStateVersion, 
externalViewVersion, segmentZkMetadataFetcher,
+              null, segmentPruners, instanceSelector, idealStateVersion, 
externalViewVersion, segmentZkMetadataFetcher,
               timeBoundaryManager, partitionMetadataManager, queryTimeoutMs, 
!idealState.isEnabled());
       if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) {
         LOGGER.info("Built routing for table: {}", tableNameWithType);
       } else {
         LOGGER.info("Rebuilt routing for table: {}", tableNameWithType);
       }
 
+      // Build routing entries for configured table samplers (if any).
+      // These entries use the same underlying routing components, but operate 
on a deterministically sampled set of
+      // segments selected during routing build.
+      List<TableSamplerConfig> tableSamplerConfigs = 
tableConfig.getTableSamplers();
+      if (CollectionUtils.isNotEmpty(tableSamplerConfigs)) {
+        Map<String, RoutingEntry> samplerRoutingEntries = new HashMap<>();
+        for (TableSamplerConfig samplerConfig : tableSamplerConfigs) {
+          String samplerName = samplerConfig.getName();
+          String samplerType = samplerConfig.getType();
+          if (StringUtils.isBlank(samplerName) || 
StringUtils.isBlank(samplerType)) {
+            LOGGER.warn("Skipping invalid table sampler config for table: {}, 
samplerName: {}, samplerType: {}",
+                tableNameWithType, samplerName, samplerType);
+            continue;
+          }
+          try {
+            TableSampler sampler = TableSamplerFactory.create(samplerType);
+            sampler.init(tableConfig, samplerConfig, _propertyStore);
+
+            // Apply sampler after base pre-selection (e.g. segment lineage).
+            Set<String> samplerPreSelectedOnlineSegments =
+                sampler.selectSegments(new 
HashSet<>(preSelectedOnlineSegments));

Review Comment:
   Cloning a set upfront involves constant overhead. Does it work if we make 
the contract that sampler should not modify the input set?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java:
##########
@@ -812,14 +852,74 @@ private void buildRoutingInternal(String 
tableNameWithType) {
 
       RoutingEntry routingEntry =
           new RoutingEntry(tableNameWithType, idealStatePath, 
externalViewPath, segmentPreSelector, segmentSelector,
-              segmentPruners, instanceSelector, idealStateVersion, 
externalViewVersion, segmentZkMetadataFetcher,
+              null, segmentPruners, instanceSelector, idealStateVersion, 
externalViewVersion, segmentZkMetadataFetcher,
               timeBoundaryManager, partitionMetadataManager, queryTimeoutMs, 
!idealState.isEnabled());
       if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) {
         LOGGER.info("Built routing for table: {}", tableNameWithType);
       } else {
         LOGGER.info("Rebuilt routing for table: {}", tableNameWithType);
       }
 
+      // Build routing entries for configured table samplers (if any).
+      // These entries use the same underlying routing components, but operate 
on a deterministically sampled set of
+      // segments selected during routing build.
+      List<TableSamplerConfig> tableSamplerConfigs = 
tableConfig.getTableSamplers();
+      if (CollectionUtils.isNotEmpty(tableSamplerConfigs)) {
+        Map<String, RoutingEntry> samplerRoutingEntries = new HashMap<>();
+        for (TableSamplerConfig samplerConfig : tableSamplerConfigs) {
+          String samplerName = samplerConfig.getName();
+          String samplerType = samplerConfig.getType();
+          if (StringUtils.isBlank(samplerName) || 
StringUtils.isBlank(samplerType)) {
+            LOGGER.warn("Skipping invalid table sampler config for table: {}, 
samplerName: {}, samplerType: {}",
+                tableNameWithType, samplerName, samplerType);
+            continue;
+          }
+          try {
+            TableSampler sampler = TableSamplerFactory.create(samplerType);
+            sampler.init(tableConfig, samplerConfig, _propertyStore);
+
+            // Apply sampler after base pre-selection (e.g. segment lineage).
+            Set<String> samplerPreSelectedOnlineSegments =
+                sampler.selectSegments(new 
HashSet<>(preSelectedOnlineSegments));
+            SegmentSelector samplerSegmentSelector = 
SegmentSelectorFactory.getSegmentSelector(tableConfig);
+            samplerSegmentSelector.init(idealState, externalView, 
samplerPreSelectedOnlineSegments);
+
+            List<SegmentPruner> samplerSegmentPruners =
+                SegmentPrunerFactory.getSegmentPruners(tableConfig, 
_propertyStore);
+            AdaptiveServerSelector samplerAdaptiveServerSelector =
+                
AdaptiveServerSelectorFactory.getAdaptiveServerSelector(_serverRoutingStatsManager,
 _pinotConfig);
+            InstanceSelector samplerInstanceSelector =
+                InstanceSelectorFactory.getInstanceSelector(tableConfig, 
_propertyStore, _brokerMetrics,
+                    samplerAdaptiveServerSelector, _pinotConfig, 
_routableServers, _enabledServerInstanceMap,
+                    idealState, externalView, 
samplerPreSelectedOnlineSegments);
+
+            SegmentZkMetadataFetcher samplerSegmentZkMetadataFetcher =
+                new SegmentZkMetadataFetcher(tableNameWithType, 
_propertyStore);
+            for (SegmentZkMetadataFetchListener listener : 
samplerSegmentPruners) {
+              samplerSegmentZkMetadataFetcher.register(listener);
+            }
+            samplerSegmentZkMetadataFetcher.init(idealState, externalView, 
samplerPreSelectedOnlineSegments);

Review Comment:
   This can be shared with the main routing



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java:
##########
@@ -812,14 +852,74 @@ private void buildRoutingInternal(String 
tableNameWithType) {
 
       RoutingEntry routingEntry =
           new RoutingEntry(tableNameWithType, idealStatePath, 
externalViewPath, segmentPreSelector, segmentSelector,
-              segmentPruners, instanceSelector, idealStateVersion, 
externalViewVersion, segmentZkMetadataFetcher,
+              null, segmentPruners, instanceSelector, idealStateVersion, 
externalViewVersion, segmentZkMetadataFetcher,
               timeBoundaryManager, partitionMetadataManager, queryTimeoutMs, 
!idealState.isEnabled());
       if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) {
         LOGGER.info("Built routing for table: {}", tableNameWithType);
       } else {
         LOGGER.info("Rebuilt routing for table: {}", tableNameWithType);
       }
 
+      // Build routing entries for configured table samplers (if any).
+      // These entries use the same underlying routing components, but operate 
on a deterministically sampled set of
+      // segments selected during routing build.
+      List<TableSamplerConfig> tableSamplerConfigs = 
tableConfig.getTableSamplers();
+      if (CollectionUtils.isNotEmpty(tableSamplerConfigs)) {
+        Map<String, RoutingEntry> samplerRoutingEntries = new HashMap<>();
+        for (TableSamplerConfig samplerConfig : tableSamplerConfigs) {
+          String samplerName = samplerConfig.getName();
+          String samplerType = samplerConfig.getType();
+          if (StringUtils.isBlank(samplerName) || 
StringUtils.isBlank(samplerType)) {
+            LOGGER.warn("Skipping invalid table sampler config for table: {}, 
samplerName: {}, samplerType: {}",
+                tableNameWithType, samplerName, samplerType);
+            continue;
+          }
+          try {
+            TableSampler sampler = TableSamplerFactory.create(samplerType);
+            sampler.init(tableConfig, samplerConfig, _propertyStore);
+
+            // Apply sampler after base pre-selection (e.g. segment lineage).
+            Set<String> samplerPreSelectedOnlineSegments =
+                sampler.selectSegments(new 
HashSet<>(preSelectedOnlineSegments));
+            SegmentSelector samplerSegmentSelector = 
SegmentSelectorFactory.getSegmentSelector(tableConfig);
+            samplerSegmentSelector.init(idealState, externalView, 
samplerPreSelectedOnlineSegments);
+
+            List<SegmentPruner> samplerSegmentPruners =
+                SegmentPrunerFactory.getSegmentPruners(tableConfig, 
_propertyStore);
+            AdaptiveServerSelector samplerAdaptiveServerSelector =
+                
AdaptiveServerSelectorFactory.getAdaptiveServerSelector(_serverRoutingStatsManager,
 _pinotConfig);

Review Comment:
   They can be shared with the main routing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to