Copilot commented on code in PR #17532: URL: https://github.com/apache/pinot/pull/17532#discussion_r2767192770
########## pinot-spi/src/main/java/org/apache/pinot/spi/config/table/sampler/TableSamplerConfig.java: ########## @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.table.sampler; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.spi.config.BaseJsonConfig; + + +/** + * Configuration for a table sampler. + * + * Samplers are defined in {@link org.apache.pinot.spi.config.table.TableConfig} and can be selected at query time + * via a query option. The sampler type can be one of the built-in types, a fully qualified class name, or an alias + * discovered via {@code pinot.broker.table.sampler.annotation.packages}. + */ +public class TableSamplerConfig extends BaseJsonConfig { + private final String _name; + private final String _type; + private final Map<String, String> _properties; + + @JsonCreator + public TableSamplerConfig(@JsonProperty(value = "name", required = true) String name, + @JsonProperty(value = "type", required = true) String type, + @JsonProperty("properties") @Nullable Map<String, String> properties) { Review Comment: The Javadoc comment for this class references a package path in the NOTE section that may become outdated if the TableSampler interface is moved. Consider using a less specific reference or adding a check to ensure the documentation stays synchronized with the actual implementation location. ########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/tablesampler/TableSamplerFactory.java: ########## @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.routing.tablesampler; + +import com.google.common.annotations.VisibleForTesting; +import java.lang.reflect.Modifier; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.spi.annotations.tablesampler.TableSamplerProvider; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.utils.PinotReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class TableSamplerFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(TableSamplerFactory.class); + private static final String ANNOTATION_PACKAGES_KEY = "annotation.packages"; Review Comment: The DEFAULT_ANNOTATION_PACKAGES is hardcoded to a single package. If built-in samplers are added outside this package, they won't be discovered by default. Consider documenting this limitation or making the default list configurable via external configuration. ```suggestion private static final String ANNOTATION_PACKAGES_KEY = "annotation.packages"; // NOTE: By default we only scan this package for @TableSamplerProvider annotations. // If built-in samplers are added in other packages, callers must configure additional // packages via the `annotation.packages` key; they will not be discovered otherwise. ``` ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/TableSamplerIntegrationTest.java: ########## @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.custom; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.Assert; +import org.testng.annotations.Test; + + +@Test(suiteName = "CustomClusterIntegrationTest") +public class TableSamplerIntegrationTest extends CustomDataQueryClusterIntegrationTest { + private static final int DAYS = 7; + private static final int SEGMENTS_PER_DAY = 10; + private static final int RECORDS_PER_SEGMENT = 1; + private static final int BASE_DAY = 20000; + + private static final String DAYS_SINCE_EPOCH_COL = "DaysSinceEpoch"; + + @Override + public String getTableName() { + return "TableSamplerIntegrationTest"; + } + + @Override + protected long getCountStarResult() { + return (long) DAYS * SEGMENTS_PER_DAY * RECORDS_PER_SEGMENT; + } + + @Override + public Schema createSchema() { + return new Schema.SchemaBuilder().setSchemaName(getTableName()) + .addDateTime(DAYS_SINCE_EPOCH_COL, FieldSpec.DataType.INT, "1:DAYS:EPOCH", "1:DAYS") + .build(); + } + + @Override + public TableConfig createOfflineTableConfig() { + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()) + .setTimeColumnName(DAYS_SINCE_EPOCH_COL) + .setTimeType("DAYS") + .build(); + tableConfig.setTableSamplers(List.of( + new TableSamplerConfig("perDay", "timeBucket", Map.of("numSegmentsPerDay", "1")))); + return tableConfig; + } + + @Override + public List<File> createAvroFiles() + throws Exception { + org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); + avroSchema.setFields(List.of( + new org.apache.avro.Schema.Field(DAYS_SINCE_EPOCH_COL, + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT), null, null))); + + List<File> files = new ArrayList<>(); + for (int day = 0; day < DAYS; day++) { + int dayValue = BASE_DAY + day; + for (int seg = 0; seg < SEGMENTS_PER_DAY; seg++) { + File avroFile = new File(_tempDir, "data_day_" + day + "_seg_" + seg + ".avro"); + try (DataFileWriter<GenericData.Record> fileWriter = + new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) { + fileWriter.create(avroSchema, avroFile); + for (int docId = 0; docId < RECORDS_PER_SEGMENT; docId++) { + GenericData.Record record = new GenericData.Record(avroSchema); + record.put(DAYS_SINCE_EPOCH_COL, dayValue); + fileWriter.append(record); + } + } + files.add(avroFile); + } + } + return files; + } + + @Test(dataProvider = "useBothQueryEngines") + public void testTimeBucketSamplerForGroupByDay(boolean useMultiStageQueryEngine) Review Comment: The integration test only validates the `timeBucket` sampler with `numSegmentsPerDay=1`. Consider adding test cases for the `firstN` sampler and edge cases such as empty segment sets or invalid sampler configurations to ensure comprehensive coverage. ########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java: ########## @@ -1087,10 +1277,37 @@ private Map<ServerInstance, SegmentsToQuery> getServerInstanceToSegmentsMap(Stri @Override public List<String> getSegments(BrokerRequest brokerRequest) { String tableNameWithType = brokerRequest.getQuerySource().getTableName(); - RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType); + RoutingEntry routingEntry = getRoutingEntry(brokerRequest, tableNameWithType); return routingEntry != null ? routingEntry.getSegments(brokerRequest) : null; } + @Nullable + private RoutingEntry getRoutingEntry(BrokerRequest brokerRequest, String tableNameWithType) { + String samplerName = extractSamplerName(brokerRequest); + if (StringUtils.isNotBlank(samplerName)) { + Map<String, RoutingEntry> samplerEntries = _samplerRoutingEntryMap.get(tableNameWithType); + RoutingEntry samplerEntry = samplerEntries != null ? samplerEntries.get(samplerName) : null; + if (samplerEntry != null) { + return samplerEntry; + } + LOGGER.warn("Requested sampler '{}' not found for table '{}'; falling back to default routing entry", + samplerName, tableNameWithType); Review Comment: This warning will be logged on every query when a non-existent sampler is requested, potentially generating excessive log noise for misconfigured clients. Consider rate-limiting this warning or logging it at debug level after the first occurrence. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
