Copilot commented on code in PR #17291: URL: https://github.com/apache/pinot/pull/17291#discussion_r2748087068
########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SystemTableBrokerRequestHandler.java: ########## @@ -0,0 +1,537 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.requesthandler; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import javax.annotation.Nullable; +import javax.net.ssl.SSLContext; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.broker.api.AccessControl; +import org.apache.pinot.broker.broker.AccessControlFactory; +import org.apache.pinot.broker.querylog.QueryLogger; +import org.apache.pinot.broker.queryquota.QueryQuotaManager; +import org.apache.pinot.client.ConnectionTimeouts; +import org.apache.pinot.client.PinotClientException; +import org.apache.pinot.client.SystemTableDataTableClient; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.datatable.DataTableImplV4; +import org.apache.pinot.common.metrics.BrokerMeter; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.request.QuerySource; +import org.apache.pinot.common.response.BrokerResponse; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.systemtable.SystemTableProvider; +import org.apache.pinot.common.systemtable.SystemTableRegistry; +import org.apache.pinot.common.utils.NamedThreadFactory; +import org.apache.pinot.common.utils.config.InstanceUtils; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; +import org.apache.pinot.common.utils.request.RequestUtils; +import org.apache.pinot.core.instance.context.BrokerContext; +import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; +import org.apache.pinot.core.plan.Plan; +import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2; +import org.apache.pinot.core.plan.maker.PlanMaker; +import org.apache.pinot.core.query.reduce.BrokerReduceService; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; +import org.apache.pinot.core.routing.MultiClusterRoutingContext; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.core.transport.ServerRoutingInstance; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.SegmentContext; +import org.apache.pinot.spi.accounting.ThreadAccountant; +import org.apache.pinot.spi.auth.AuthorizationResult; +import org.apache.pinot.spi.auth.broker.RequesterIdentity; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.exception.BadQueryRequestException; +import org.apache.pinot.spi.exception.QueryErrorCode; +import org.apache.pinot.spi.query.QueryExecutionContext; +import org.apache.pinot.spi.query.QueryThreadContext; +import org.apache.pinot.spi.trace.RequestContext; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.sql.parsers.CalciteSqlParser; +import org.apache.pinot.sql.parsers.SqlNodeAndOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Broker request handler for system tables (handled entirely on the broker). + */ +public class SystemTableBrokerRequestHandler extends BaseBrokerRequestHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(SystemTableBrokerRequestHandler.class); + private static final String SYSTEM_TABLE_PSEUDO_HOST = "localhost"; + private static final int SYSTEM_TABLE_PSEUDO_PORT = 0; + private static final String SYSTEM_TABLE_DATATABLE_API_PATH = "/query/systemTable/datatable"; + // Hop-by-hop headers per RFC 7230 plus content-length/host which are request-specific. + private static final Set<String> HOP_BY_HOP_HEADERS_TO_SKIP = Set.of( + "connection", + "keep-alive", + "proxy-authenticate", + "proxy-authorization", + "te", + "trailer", + "transfer-encoding", + "upgrade", + "host", + "content-length"); + + private final BrokerReduceService _brokerReduceService; + private final PlanMaker _planMaker; + private final ExecutorService _executorService; + private final ExecutorService _scatterGatherExecutorService; + private final SystemTableDataTableClient _systemTableDataTableClient; + @Nullable + private final HelixManager _helixManager; + + public SystemTableBrokerRequestHandler(PinotConfiguration config, String brokerId, + BrokerRequestIdGenerator requestIdGenerator, RoutingManager routingManager, + AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, + ThreadAccountant threadAccountant, @Nullable MultiClusterRoutingContext multiClusterRoutingContext, + @Nullable HelixManager helixManager) { + super(config, brokerId, requestIdGenerator, routingManager, accessControlFactory, queryQuotaManager, tableCache, + threadAccountant, multiClusterRoutingContext); + _brokerReduceService = new BrokerReduceService(_config); + _planMaker = new InstancePlanMakerImplV2(); + _planMaker.init(_config); + _helixManager = helixManager; + int executorPoolSize = config.getProperty(CommonConstants.Broker.CONFIG_OF_SYSTEM_TABLE_EXECUTOR_POOL_SIZE, + CommonConstants.Broker.DEFAULT_SYSTEM_TABLE_EXECUTOR_POOL_SIZE); + executorPoolSize = Math.max(1, executorPoolSize); + _executorService = QueryThreadContext.contextAwareExecutorService(Executors.newFixedThreadPool(executorPoolSize, + new NamedThreadFactory("system-table-query-executor"))); + _scatterGatherExecutorService = + QueryThreadContext.contextAwareExecutorService(Executors.newFixedThreadPool(executorPoolSize, + new NamedThreadFactory("system-table-scatter-gather-executor"))); + SSLContext sslContext = BrokerContext.getInstance().getClientHttpsContext(); + int timeoutMs = (int) Math.min(Integer.MAX_VALUE, _brokerTimeoutMs); + if (timeoutMs < 1) { + timeoutMs = 1; + } + ConnectionTimeouts connectionTimeouts = ConnectionTimeouts.create(timeoutMs, timeoutMs, timeoutMs); + _systemTableDataTableClient = + new SystemTableDataTableClient(connectionTimeouts, sslContext); Review Comment: The SystemTableDataTableClient is instantiated with specific connection timeouts and SSL context, but there are no tests covering error handling when SSL context is null or when connection timeouts are invalid. Consider adding test coverage for these edge cases. ########## pinot-plugins/pinot-system-table/src/main/java/org/apache/pinot/systemtable/provider/TablesSystemTableProvider.java: ########## @@ -0,0 +1,662 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.systemtable.provider; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.IntFunction; +import javax.annotation.Nullable; +import org.apache.helix.HelixAdmin; +import org.apache.pinot.client.admin.PinotAdminClient; +import org.apache.pinot.client.admin.PinotAdminTransport; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.systemtable.SystemTable; +import org.apache.pinot.common.systemtable.SystemTableProvider; +import org.apache.pinot.common.systemtable.datasource.InMemorySystemTableSegment; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Basic system table exposing table-level metadata populated from the broker {@link TableCache}. + */ +@SystemTable +public final class TablesSystemTableProvider implements SystemTableProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(TablesSystemTableProvider.class); + public static final String TABLE_NAME = "system.tables"; + private static final String SIZE_CACHE_TTL_MS_PROPERTY = "pinot.systemtable.tables.sizeCacheTtlMs"; + private static final long DEFAULT_SIZE_CACHE_TTL_MS = Duration.ofMinutes(1).toMillis(); + private static final long SIZE_CACHE_TTL_MS = getNonNegativeLongProperty(SIZE_CACHE_TTL_MS_PROPERTY, + DEFAULT_SIZE_CACHE_TTL_MS); + + private static final String CONTROLLER_TIMEOUT_MS_PROPERTY = "pinot.systemtable.tables.controllerTimeoutMs"; + private static final long DEFAULT_CONTROLLER_TIMEOUT_MS = Duration.ofSeconds(5).toMillis(); + private static final long CONTROLLER_TIMEOUT_MS = getPositiveLongProperty(CONTROLLER_TIMEOUT_MS_PROPERTY, + DEFAULT_CONTROLLER_TIMEOUT_MS); + + private static final long SIZE_FETCH_FAILURE_WARN_INTERVAL_MS = Duration.ofHours(1).toMillis(); + + private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) + .addSingleValueDimension("tableName", FieldSpec.DataType.STRING) + .addSingleValueDimension("type", FieldSpec.DataType.STRING) + .addSingleValueDimension("status", FieldSpec.DataType.STRING) + .addSingleValueDimension("segments", FieldSpec.DataType.INT) + .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG) + .addMetric("reportedSize", FieldSpec.DataType.LONG) + .addMetric("estimatedSize", FieldSpec.DataType.LONG) Review Comment: The column names `reportedSize` and `estimatedSize` are inconsistent with the camelCase convention used by other columns in the schema. Consider renaming them to `reportedSizeBytes` and `estimatedSizeBytes` to clarify the unit and maintain consistency with other size-related fields. ########## pinot-common/src/main/java/org/apache/pinot/common/systemtable/SystemTableRegistry.java: ########## @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.systemtable; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.helix.HelixAdmin; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.PinotReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Registry to hold and lifecycle-manage system table data providers. + */ +public final class SystemTableRegistry { + private static final Logger LOGGER = LoggerFactory.getLogger(SystemTableRegistry.class); + + // Providers are registered once at broker startup. + private static final Map<String, SystemTableProvider> PROVIDERS = new HashMap<>(); + + private static final class ConstructorSpec { + final Class<?>[] _paramTypes; + final Object[] _args; + + ConstructorSpec(Class<?>[] paramTypes, Object[] args) { + _paramTypes = paramTypes; + _args = args; + } + } + + private SystemTableRegistry() { + throw new UnsupportedOperationException("This is a utility class and cannot be instantiated"); + } + + public static void register(SystemTableProvider provider) { + synchronized (PROVIDERS) { + PROVIDERS.put(normalize(provider.getTableName()), provider); + } + } + + @Nullable + public static SystemTableProvider get(String tableName) { + synchronized (PROVIDERS) { + return PROVIDERS.get(normalize(tableName)); + } + } + + public static boolean isRegistered(String tableName) { + synchronized (PROVIDERS) { + return PROVIDERS.containsKey(normalize(tableName)); + } + } + + public static Collection<SystemTableProvider> getProviders() { + synchronized (PROVIDERS) { + return Collections.unmodifiableCollection(new java.util.ArrayList<>(PROVIDERS.values())); + } + } + + /** + * Discover and register providers marked with {@link SystemTable} using the available dependencies. + * Follows the ScalarFunction pattern: any class annotated with @SystemTable under a "*.systemtable.*" package + * will be discovered via reflection and registered. + */ + public static void registerAnnotatedProviders(TableCache tableCache, HelixAdmin helixAdmin, String clusterName, + @Nullable PinotConfiguration config) { + Set<Class<?>> classes = + PinotReflectionUtils.getClassesThroughReflection(".*\\.systemtable\\..*", SystemTable.class); + for (Class<?> clazz : classes) { + if (!SystemTableProvider.class.isAssignableFrom(clazz)) { + continue; + } + SystemTableProvider provider = + instantiateProvider(clazz.asSubclass(SystemTableProvider.class), tableCache, helixAdmin, clusterName, config); + if (provider == null) { + continue; + } + if (isRegistered(provider.getTableName())) { + continue; + } + LOGGER.info("Registering system table provider: {}", provider.getTableName()); + register(provider); + } + } + + public static void close() + throws Exception { + Exception firstException = null; + // Snapshot providers to avoid concurrent modifications and to close each provider at most once. + Map<SystemTableProvider, Boolean> providersToClose = new IdentityHashMap<>(); + synchronized (PROVIDERS) { + for (SystemTableProvider provider : PROVIDERS.values()) { + providersToClose.put(provider, Boolean.TRUE); + } + } + try { + for (SystemTableProvider provider : providersToClose.keySet()) { Review Comment: Using `IdentityHashMap<SystemTableProvider, Boolean>` to track unique providers for closing is unconventional. Consider using `Collections.newSetFromMap(new IdentityHashMap<>())` to create an identity-based Set, which more clearly expresses the intent of tracking unique provider instances. ```suggestion Set<SystemTableProvider> providersToClose = Collections.newSetFromMap(new IdentityHashMap<SystemTableProvider, Boolean>()); synchronized (PROVIDERS) { for (SystemTableProvider provider : PROVIDERS.values()) { providersToClose.add(provider); } } try { for (SystemTableProvider provider : providersToClose) { ``` ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SystemTableBrokerRequestHandler.java: ########## @@ -0,0 +1,537 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.requesthandler; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import javax.annotation.Nullable; +import javax.net.ssl.SSLContext; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.broker.api.AccessControl; +import org.apache.pinot.broker.broker.AccessControlFactory; +import org.apache.pinot.broker.querylog.QueryLogger; +import org.apache.pinot.broker.queryquota.QueryQuotaManager; +import org.apache.pinot.client.ConnectionTimeouts; +import org.apache.pinot.client.PinotClientException; +import org.apache.pinot.client.SystemTableDataTableClient; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.datatable.DataTableImplV4; +import org.apache.pinot.common.metrics.BrokerMeter; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.request.QuerySource; +import org.apache.pinot.common.response.BrokerResponse; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.systemtable.SystemTableProvider; +import org.apache.pinot.common.systemtable.SystemTableRegistry; +import org.apache.pinot.common.utils.NamedThreadFactory; +import org.apache.pinot.common.utils.config.InstanceUtils; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; +import org.apache.pinot.common.utils.request.RequestUtils; +import org.apache.pinot.core.instance.context.BrokerContext; +import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; +import org.apache.pinot.core.plan.Plan; +import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2; +import org.apache.pinot.core.plan.maker.PlanMaker; +import org.apache.pinot.core.query.reduce.BrokerReduceService; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; +import org.apache.pinot.core.routing.MultiClusterRoutingContext; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.core.transport.ServerRoutingInstance; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.SegmentContext; +import org.apache.pinot.spi.accounting.ThreadAccountant; +import org.apache.pinot.spi.auth.AuthorizationResult; +import org.apache.pinot.spi.auth.broker.RequesterIdentity; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.exception.BadQueryRequestException; +import org.apache.pinot.spi.exception.QueryErrorCode; +import org.apache.pinot.spi.query.QueryExecutionContext; +import org.apache.pinot.spi.query.QueryThreadContext; +import org.apache.pinot.spi.trace.RequestContext; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.sql.parsers.CalciteSqlParser; +import org.apache.pinot.sql.parsers.SqlNodeAndOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Broker request handler for system tables (handled entirely on the broker). + */ +public class SystemTableBrokerRequestHandler extends BaseBrokerRequestHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(SystemTableBrokerRequestHandler.class); + private static final String SYSTEM_TABLE_PSEUDO_HOST = "localhost"; + private static final int SYSTEM_TABLE_PSEUDO_PORT = 0; + private static final String SYSTEM_TABLE_DATATABLE_API_PATH = "/query/systemTable/datatable"; + // Hop-by-hop headers per RFC 7230 plus content-length/host which are request-specific. + private static final Set<String> HOP_BY_HOP_HEADERS_TO_SKIP = Set.of( + "connection", + "keep-alive", + "proxy-authenticate", + "proxy-authorization", + "te", + "trailer", + "transfer-encoding", + "upgrade", + "host", + "content-length"); + + private final BrokerReduceService _brokerReduceService; + private final PlanMaker _planMaker; + private final ExecutorService _executorService; + private final ExecutorService _scatterGatherExecutorService; + private final SystemTableDataTableClient _systemTableDataTableClient; + @Nullable + private final HelixManager _helixManager; + + public SystemTableBrokerRequestHandler(PinotConfiguration config, String brokerId, + BrokerRequestIdGenerator requestIdGenerator, RoutingManager routingManager, + AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, + ThreadAccountant threadAccountant, @Nullable MultiClusterRoutingContext multiClusterRoutingContext, + @Nullable HelixManager helixManager) { + super(config, brokerId, requestIdGenerator, routingManager, accessControlFactory, queryQuotaManager, tableCache, + threadAccountant, multiClusterRoutingContext); + _brokerReduceService = new BrokerReduceService(_config); + _planMaker = new InstancePlanMakerImplV2(); + _planMaker.init(_config); + _helixManager = helixManager; + int executorPoolSize = config.getProperty(CommonConstants.Broker.CONFIG_OF_SYSTEM_TABLE_EXECUTOR_POOL_SIZE, + CommonConstants.Broker.DEFAULT_SYSTEM_TABLE_EXECUTOR_POOL_SIZE); + executorPoolSize = Math.max(1, executorPoolSize); + _executorService = QueryThreadContext.contextAwareExecutorService(Executors.newFixedThreadPool(executorPoolSize, + new NamedThreadFactory("system-table-query-executor"))); + _scatterGatherExecutorService = + QueryThreadContext.contextAwareExecutorService(Executors.newFixedThreadPool(executorPoolSize, + new NamedThreadFactory("system-table-scatter-gather-executor"))); + SSLContext sslContext = BrokerContext.getInstance().getClientHttpsContext(); + int timeoutMs = (int) Math.min(Integer.MAX_VALUE, _brokerTimeoutMs); + if (timeoutMs < 1) { Review Comment: The timeout clamping logic (minimum 1ms) could lead to unexpected behavior if `_brokerTimeoutMs` is 0 or negative, which might be a valid configuration for certain use cases. Consider logging a warning when the timeout is clamped, or documenting why 1ms is the minimum acceptable value. ```suggestion if (timeoutMs < 1) { // Enforce a minimum timeout of 1ms; zero or negative timeouts are not meaningful for client connections. LOGGER.warn("Configured broker timeout {}ms is non-positive; clamping to 1ms for system table client " + "connections", _brokerTimeoutMs); ``` ########## pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminPathUtils.java: ########## @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.client.admin; + +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; + + +/** + * Utility for encoding admin client URL path segments. Stateless and thread-safe. + */ +final class PinotAdminPathUtils { + private PinotAdminPathUtils() { + } + Review Comment: The method lacks a Javadoc comment explaining why the `+` character is replaced with `%20`. This is important because URLEncoder.encode() follows application/x-www-form-urlencoded encoding (where space becomes +), but URL path segments require percent-encoding (where space becomes %20). Add a comment explaining this distinction. ```suggestion /** * Encodes a single URL path segment using UTF-8. * <p> * {@link URLEncoder} implements {@code application/x-www-form-urlencoded} encoding, where spaces * are converted to {@code +}. For URL path segments, spaces must instead be percent-encoded as * {@code %20}, so this method post-processes the encoded value to replace {@code +} with * {@code %20}. */ ``` ########## pinot-plugins/pinot-system-table/src/main/java/org/apache/pinot/systemtable/provider/TablesSystemTableProvider.java: ########## @@ -0,0 +1,662 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.systemtable.provider; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.IntFunction; +import javax.annotation.Nullable; +import org.apache.helix.HelixAdmin; +import org.apache.pinot.client.admin.PinotAdminClient; +import org.apache.pinot.client.admin.PinotAdminTransport; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.systemtable.SystemTable; +import org.apache.pinot.common.systemtable.SystemTableProvider; +import org.apache.pinot.common.systemtable.datasource.InMemorySystemTableSegment; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Basic system table exposing table-level metadata populated from the broker {@link TableCache}. + */ +@SystemTable +public final class TablesSystemTableProvider implements SystemTableProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(TablesSystemTableProvider.class); + public static final String TABLE_NAME = "system.tables"; + private static final String SIZE_CACHE_TTL_MS_PROPERTY = "pinot.systemtable.tables.sizeCacheTtlMs"; + private static final long DEFAULT_SIZE_CACHE_TTL_MS = Duration.ofMinutes(1).toMillis(); + private static final long SIZE_CACHE_TTL_MS = getNonNegativeLongProperty(SIZE_CACHE_TTL_MS_PROPERTY, + DEFAULT_SIZE_CACHE_TTL_MS); + + private static final String CONTROLLER_TIMEOUT_MS_PROPERTY = "pinot.systemtable.tables.controllerTimeoutMs"; + private static final long DEFAULT_CONTROLLER_TIMEOUT_MS = Duration.ofSeconds(5).toMillis(); + private static final long CONTROLLER_TIMEOUT_MS = getPositiveLongProperty(CONTROLLER_TIMEOUT_MS_PROPERTY, + DEFAULT_CONTROLLER_TIMEOUT_MS); + + private static final long SIZE_FETCH_FAILURE_WARN_INTERVAL_MS = Duration.ofHours(1).toMillis(); + + private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) + .addSingleValueDimension("tableName", FieldSpec.DataType.STRING) + .addSingleValueDimension("type", FieldSpec.DataType.STRING) + .addSingleValueDimension("status", FieldSpec.DataType.STRING) + .addSingleValueDimension("segments", FieldSpec.DataType.INT) + .addSingleValueDimension("totalDocs", FieldSpec.DataType.LONG) + .addMetric("reportedSize", FieldSpec.DataType.LONG) + .addMetric("estimatedSize", FieldSpec.DataType.LONG) + .addSingleValueDimension("brokerTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("serverTenant", FieldSpec.DataType.STRING) + .addSingleValueDimension("replicas", FieldSpec.DataType.INT) + .addSingleValueDimension("tableConfig", FieldSpec.DataType.STRING) + .build(); + + private final TableCache _tableCache; + private final @Nullable HelixAdmin _helixAdmin; + private final @Nullable String _clusterName; + private final @Nullable Function<String, TableSize> _tableSizeFetcherOverride; + private final List<String> _configuredControllerUrls; + private final Map<String, CachedSize> _sizeCache = new ConcurrentHashMap<>(); + private final Map<String, PinotAdminClient> _adminClientCache = new ConcurrentHashMap<>(); + private final AtomicLong _lastSizeFetchFailureWarnLogMs = new AtomicLong(); + + public TablesSystemTableProvider() { + this(null, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache) { + this(tableCache, null, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin) { + this(tableCache, helixAdmin, null, null, null); + } + + public TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, + @Nullable String clusterName) { + this(tableCache, helixAdmin, clusterName, null, null); + } + + TablesSystemTableProvider(TableCache tableCache, @Nullable HelixAdmin helixAdmin, @Nullable String clusterName, + @Nullable Function<String, TableSize> tableSizeFetcherOverride, @Nullable List<String> controllerUrls) { + _tableCache = tableCache; + _helixAdmin = helixAdmin; + _clusterName = clusterName; + _tableSizeFetcherOverride = tableSizeFetcherOverride; + _configuredControllerUrls = controllerUrls != null ? new ArrayList<>(controllerUrls) : List.of(); + } + + @Override + public String getTableName() { + return TABLE_NAME; + } + + @Override + public Schema getSchema() { + return SCHEMA; + } + + @Override + public TableConfig getTableConfig() { + return new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); + } + + @Override + public void close() + throws Exception { + for (Map.Entry<String, PinotAdminClient> entry : _adminClientCache.entrySet()) { + try { + entry.getValue().close(); + } catch (Exception e) { + LOGGER.debug("Failed to close admin client for {}: {}", entry.getKey(), e.toString()); + } + } + _adminClientCache.clear(); + } + + @Override + public IndexSegment getDataSource() { + if (_tableCache == null) { + return new InMemorySystemTableSegment(TABLE_NAME, SCHEMA, 0, Collections.emptyMap()); + } + + Set<String> tableNamesWithType = new LinkedHashSet<>(); + for (String tableName : _tableCache.getTableNameMap().values()) { + if (TableNameBuilder.getTableTypeFromTableName(tableName) != null) { + tableNamesWithType.add(tableName); + } + } + List<String> sortedTableNames = new ArrayList<>(tableNamesWithType); + sortedTableNames.sort(Comparator.naturalOrder()); + + List<String> controllerBaseUrls = getControllerBaseUrls(); + Function<String, TableSize> sizeFetcher = getSizeFetcher(); + class TableRow { + final String _tableNameWithType; + final TableType _tableType; + final String _rawTableName; + final @Nullable TableConfig _tableConfig; + private volatile @Nullable String _tableConfigJson; + private volatile @Nullable TableSize _tableSize; + private volatile boolean _tableSizeFetched; + + private TableRow(String tableNameWithType, TableType tableType, String rawTableName, + @Nullable TableConfig tableConfig) { + _tableNameWithType = tableNameWithType; + _tableType = tableType; + _rawTableName = rawTableName; + _tableConfig = tableConfig; + } + + @Nullable + private TableSize getTableSize() { + if (_tableSizeFetched) { + return _tableSize; + } + synchronized (this) { + if (_tableSizeFetched) { + return _tableSize; + } + _tableSize = fetchTableSize(_tableNameWithType, sizeFetcher, controllerBaseUrls); + _tableSizeFetched = true; + return _tableSize; + } + } + + private String getStatus() { + if (_tableConfig != null) { + return "ONLINE"; + } + TableSize sizeFromController = getTableSize(); + int segments = sizeFromController != null ? getSegmentCount(sizeFromController, _tableType) : 0; + return segments > 0 ? "ONLINE" : "UNKNOWN"; + } + + private int getSegments() { + TableSize sizeFromController = getTableSize(); + return sizeFromController != null ? getSegmentCount(sizeFromController, _tableType) : 0; + } + + private long getTotalDocs() { + TableSize sizeFromController = getTableSize(); + return sizeFromController != null ? TablesSystemTableProvider.this.getTotalDocs(sizeFromController, _tableType, + _tableNameWithType, controllerBaseUrls) : 0L; + } + + private long getReportedSize() { + TableSize sizeFromController = getTableSize(); + if (sizeFromController == null || sizeFromController._reportedSizeInBytes < 0) { + return 0L; + } + return sizeFromController._reportedSizeInBytes; + } + + private long getEstimatedSize() { + TableSize sizeFromController = getTableSize(); + if (sizeFromController == null || sizeFromController._estimatedSizeInBytes < 0) { + return 0L; + } + return sizeFromController._estimatedSizeInBytes; + } + + private String getBrokerTenant() { + if (_tableConfig != null && _tableConfig.getTenantConfig() != null) { + String tenant = _tableConfig.getTenantConfig().getBroker(); + return tenant != null ? tenant : ""; + } + return ""; + } + + private String getServerTenant() { + if (_tableConfig != null && _tableConfig.getTenantConfig() != null) { + String tenant = _tableConfig.getTenantConfig().getServer(); + return tenant != null ? tenant : ""; + } + return ""; + } + + private int getReplicas() { + if (_tableConfig != null && _tableConfig.getValidationConfig() != null) { + Integer replicationNumber = _tableConfig.getValidationConfig().getReplicationNumber(); + if (replicationNumber != null) { + return replicationNumber; + } + } + return 0; + } + + private String getTableConfigJson() { + String cached = _tableConfigJson; + if (cached != null) { + return cached; + } + synchronized (this) { + cached = _tableConfigJson; + if (cached != null) { + return cached; + } + cached = ""; + if (_tableConfig != null) { + try { + cached = JsonUtils.objectToString(_tableConfig); + } catch (Exception e) { + LOGGER.warn("Failed to serialize table config for {}: {}", _tableNameWithType, e.toString()); + cached = _tableConfig.toString(); + } + } + _tableConfigJson = cached; + return cached; + } + } + } + + List<TableRow> tableRows = new ArrayList<>(sortedTableNames.size()); + for (String tableNameWithType : sortedTableNames) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType == null) { + continue; + } + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType); + tableRows.add(new TableRow(tableNameWithType, tableType, rawTableName, tableConfig)); + } + + Map<String, IntFunction<Object>> valueProviders = new java.util.HashMap<>(); + valueProviders.put("tableName", docId -> tableRows.get(docId)._rawTableName); + valueProviders.put("type", docId -> tableRows.get(docId)._tableType.name()); + valueProviders.put("status", docId -> tableRows.get(docId).getStatus()); + valueProviders.put("segments", docId -> tableRows.get(docId).getSegments()); + valueProviders.put("totalDocs", docId -> tableRows.get(docId).getTotalDocs()); + valueProviders.put("reportedSize", docId -> tableRows.get(docId).getReportedSize()); + valueProviders.put("estimatedSize", docId -> tableRows.get(docId).getEstimatedSize()); + valueProviders.put("brokerTenant", docId -> tableRows.get(docId).getBrokerTenant()); + valueProviders.put("serverTenant", docId -> tableRows.get(docId).getServerTenant()); + valueProviders.put("replicas", docId -> tableRows.get(docId).getReplicas()); + valueProviders.put("tableConfig", docId -> tableRows.get(docId).getTableConfigJson()); + + return new InMemorySystemTableSegment(TABLE_NAME, SCHEMA, tableRows.size(), valueProviders); + } + + @Nullable + private TableSize fetchTableSize(String tableNameWithType, + @Nullable Function<String, TableSize> fetcher, List<String> controllerBaseUrls) { + boolean cacheEnabled = SIZE_CACHE_TTL_MS > 0; + TableSize cached = cacheEnabled ? getCachedSize(tableNameWithType) : null; + if (cached != null) { + return cached; + } + if (fetcher != null) { + try { + TableSize fetched = fetcher.apply(tableNameWithType); + if (fetched != null) { + if (cacheEnabled) { + cacheSize(tableNameWithType, fetched); + } + return fetched; + } + } catch (Exception e) { + LOGGER.warn("Table size fetcher failed for {}: {}", tableNameWithType, e.toString()); + } + } + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + TableSize size = fetchTableSizeForName(controllerBaseUrls, rawTableName); + if (size == null) { + size = fetchTableSizeForName(controllerBaseUrls, tableNameWithType); + if (size == null) { + logSizeFetchFailure("{}: failed to fetch size for {} from controllers {} " + + "(tried raw table name '{}' and table name with type '{}')", + TABLE_NAME, tableNameWithType, controllerBaseUrls, rawTableName, tableNameWithType); + } + } + if (size != null && cacheEnabled) { + cacheSize(tableNameWithType, size); + } + return size; + } + + @Nullable + private TableSize fetchTableSizeForName(List<String> controllerBaseUrls, String tableName) { + for (String baseUrl : controllerBaseUrls) { + try { + PinotAdminClient adminClient = getOrCreateAdminClient(baseUrl); + if (adminClient == null) { + continue; + } + + JsonNode sizeNode = adminClient.getTableSize(tableName, true, false); + + if (sizeNode == null) { + continue; + } + + TableSize parsed = JsonUtils.stringToObject(sizeNode.toString(), TableSize.class); + LOGGER.debug("{}: controller size response for {} via {} -> segments offline={}, realtime={}, " + + "reportedSize={}, estimatedSize={}", TABLE_NAME, tableName, baseUrl, + parsed._offlineSegments != null && parsed._offlineSegments._segments != null + ? parsed._offlineSegments._segments.size() : 0, + parsed._realtimeSegments != null && parsed._realtimeSegments._segments != null + ? parsed._realtimeSegments._segments.size() : 0, + parsed._reportedSizeInBytes, parsed._estimatedSizeInBytes); + return parsed; + } catch (Exception e) { + logSizeFetchFailure("{}: error fetching table size for {} via {} using admin client", TABLE_NAME, tableName, + baseUrl, e); + } + } + return null; + } + + private List<String> getControllerBaseUrls() { + Set<String> urls = new LinkedHashSet<>(); + if (_helixAdmin != null) { + for (String controller : discoverControllersFromHelix()) { + String normalized = normalizeControllerUrl(controller); + if (normalized != null) { + urls.add(normalized); + } + } + } + for (String url : _configuredControllerUrls) { + String normalized = normalizeControllerUrl(url); + if (normalized != null) { + urls.add(normalized); + } + } + return new ArrayList<>(urls); + } + + private int getSegmentCount(TableSize sizeFromController, TableType tableType) { + if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments != null + && sizeFromController._offlineSegments._segments != null) { + return sizeFromController._offlineSegments._segments.size(); + } + if (tableType == TableType.REALTIME && sizeFromController._realtimeSegments != null + && sizeFromController._realtimeSegments._segments != null) { + return sizeFromController._realtimeSegments._segments.size(); + } + return 0; + } + + private long getTotalDocsFromSize(TableSize sizeFromController, TableType tableType) { + if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments != null + && sizeFromController._offlineSegments._segments != null) { + return sizeFromController._offlineSegments._segments.values().stream() + .mapToLong(segmentSize -> segmentSize._totalDocs).sum(); + } + if (tableType == TableType.REALTIME && sizeFromController._realtimeSegments != null + && sizeFromController._realtimeSegments._segments != null) { + return sizeFromController._realtimeSegments._segments.values().stream() + .mapToLong(segmentSize -> segmentSize._totalDocs).sum(); + } + return 0; + } + + private long getTotalDocs(TableSize sizeFromController, TableType tableType, String tableNameWithType, + List<String> controllerBaseUrls) { + if (tableType == TableType.OFFLINE && sizeFromController._offlineSegments != null + && sizeFromController._offlineSegments._segments != null) { + long cached = sizeFromController._offlineTotalDocs; + if (cached >= 0) { + return cached; + } + long totalDocsFromSize = getTotalDocsFromSize(sizeFromController, tableType); + if (totalDocsFromSize > 0) { + synchronized (sizeFromController) { + if (sizeFromController._offlineTotalDocs < 0) { + sizeFromController._offlineTotalDocs = totalDocsFromSize; + } + return sizeFromController._offlineTotalDocs; + } + } + long fetched = fetchTotalDocsFromSegmentMetadata(tableNameWithType, sizeFromController._offlineSegments._segments, + controllerBaseUrls); Review Comment: The method fetches totalDocs by iterating through all segments and making individual controller API calls for each segment's metadata. For tables with many segments, this could result in a large number of network round trips. Consider optimizing by batching segment metadata requests or using a controller API that returns totalDocs for all segments in a single call. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
