This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4cd508d7a8 Cache configs for logical table context in server (#15881)
4cd508d7a8 is described below
commit 4cd508d7a84e653b1d85eca93e270988423385a6
Author: Abhishek Bafna <[email protected]>
AuthorDate: Tue Jun 10 12:55:31 2025 +0530
Cache configs for logical table context in server (#15881)
---
.../HelixExternalViewBasedQueryQuotaManager.java | 3 +-
.../config/provider/LogicalTableMetadataCache.java | 321 +++++++++++++++++++++
.../pinot/common/config/provider/TableCache.java | 15 +-
.../pinot/common/metadata/ZKMetadataProvider.java | 6 +-
.../PinotHelixPropertyStoreZnRecordProvider.java | 3 +-
.../controller/helix/ControllerRequestClient.java | 11 +
.../helix/core/PinotHelixResourceManager.java | 6 +
.../pinot/controller/helix/ControllerTest.java | 5 +
.../helix/LogicalTableMetadataCacheTest.java | 260 +++++++++++++++++
.../BaseLogicalTableIntegrationTest.java | 4 +-
.../plan/server/ServerPlanRequestUtils.java | 13 +-
.../starter/helix/HelixInstanceDataManager.java | 22 +-
.../apache/pinot/spi/data/LogicalTableConfig.java | 25 ++
.../apache/pinot/spi/utils/CommonConstants.java | 10 +
14 files changed, 676 insertions(+), 28 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index 7bb8641271..46009aab74 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -51,6 +51,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.CommonConstants.ZkPaths;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -960,6 +961,6 @@ public class HelixExternalViewBasedQueryQuotaManager
implements ClusterChangeHan
}
private String constructLogicalTableConfigPath(String tableName) {
- return "/LOGICAL/TABLE/" + tableName;
+ return ZkPaths.LOGICAL_TABLE_PATH_PREFIX + tableName;
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/LogicalTableMetadataCache.java
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/LogicalTableMetadataCache.java
new file mode 100644
index 0000000000..62982ea680
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/LogicalTableMetadataCache.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.config.provider;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.helix.AccessOption;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.zkclient.IZkChildListener;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.utils.LogicalTableConfigUtils;
+import org.apache.pinot.common.utils.SchemaUtils;
+import org.apache.pinot.common.utils.config.TableConfigUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants.ZkPaths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * LogicalTableMetadataCache maintains the cache for logical tables, that
includes the logical table configs,
+ * logical table schemas, and reference offline and realtime table configs.
+ * It listens to changes in the ZK property store for all the logical table
configs and updates the cache accordingly.
+ * For schema and table configs, it listens to only those configs that are
required by the logical tables.
+ */
+public class LogicalTableMetadataCache {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LogicalTableMetadataCache.class);
+
+ private final Map<String, LogicalTableConfig> _logicalTableConfigMap = new
ConcurrentHashMap<>();
+ private final Map<String, Schema> _schemaMap = new ConcurrentHashMap<>();
+ private final Map<String, TableConfig> _tableConfigMap = new
ConcurrentHashMap<>();
+ private final Map<String, List<String>> _tableNameToLogicalTableNamesMap =
new ConcurrentHashMap<>();
+
+ private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+ private ZkTableConfigChangeListener _zkTableConfigChangeListener;
+ private ZkSchemaChangeListener _zkSchemaChangeListener;
+ private ZkLogicalTableConfigChangeListener
_zkLogicalTableConfigChangeListener;
+
+ public void init(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ _propertyStore = propertyStore;
+ _zkTableConfigChangeListener = new ZkTableConfigChangeListener();
+ _zkSchemaChangeListener = new ZkSchemaChangeListener();
+ _zkLogicalTableConfigChangeListener = new
ZkLogicalTableConfigChangeListener();
+
+ // Add child listeners to the property store for logical table config
changes
+ _propertyStore.subscribeChildChanges(ZkPaths.LOGICAL_TABLE_PARENT_PATH,
_zkLogicalTableConfigChangeListener);
+
+ LOGGER.info("Logical table metadata cache initialized");
+ }
+
+ public void shutdown() {
+ // Unsubscribe from the logical table config creation changes
+ _propertyStore.unsubscribeChildChanges(ZkPaths.LOGICAL_TABLE_PARENT_PATH,
_zkLogicalTableConfigChangeListener);
+
+ // Unsubscribe from all logical table config paths, table config paths,
and schema paths
+ unsubscribeDataChanges(_logicalTableConfigMap.keySet(),
ZkPaths.LOGICAL_TABLE_PATH_PREFIX,
+ _zkLogicalTableConfigChangeListener);
+ unsubscribeDataChanges(_tableConfigMap.keySet(),
ZkPaths.TABLE_CONFIG_PATH_PREFIX, _zkTableConfigChangeListener);
+ unsubscribeDataChanges(_schemaMap.keySet(), ZkPaths.SCHEMA_PATH_PREFIX,
_zkSchemaChangeListener);
+
+ // Clear all caches
+ _logicalTableConfigMap.clear();
+ _schemaMap.clear();
+ _tableConfigMap.clear();
+ _tableNameToLogicalTableNamesMap.clear();
+
+ LOGGER.info("Logical table metadata cache shutdown");
+ }
+
+ private void unsubscribeDataChanges(Set<String> resourceNames, String
pathPrefix,
+ IZkDataListener changeListener) {
+ for (String resource : resourceNames) {
+ String logicalTableConfigPath = pathPrefix + resource;
+ _propertyStore.unsubscribeDataChanges(logicalTableConfigPath,
changeListener);
+ }
+ }
+
+ @Nullable
+ public Schema getSchema(String schemaName) {
+ return _schemaMap.get(schemaName);
+ }
+
+ @Nullable
+ public TableConfig getTableConfig(String tableName) {
+ return _tableConfigMap.get(tableName);
+ }
+
+ @Nullable
+ public LogicalTableConfig getLogicalTableConfig(String logicalTableName) {
+ return _logicalTableConfigMap.get(logicalTableName);
+ }
+
+ private class ZkTableConfigChangeListener implements IZkDataListener {
+
+ @Override
+ public synchronized void handleDataChange(String path, Object data) {
+ if (data != null) {
+ ZNRecord znRecord = (ZNRecord) data;
+ try {
+ TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord);
+ _tableConfigMap.put(tableConfig.getTableName(), tableConfig);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while refreshing table config for
ZNRecord: {}", znRecord.getId(), e);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void handleDataDeleted(String path) {
+ // no-op, table config should not be deleted while referenced in the
logical table config
+ }
+ }
+
+ private class ZkSchemaChangeListener implements IZkDataListener {
+
+ @Override
+ public synchronized void handleDataChange(String path, Object data) {
+ if (data != null) {
+ ZNRecord znRecord = (ZNRecord) data;
+ try {
+ Schema schema = SchemaUtils.fromZNRecord(znRecord);
+ _schemaMap.put(schema.getSchemaName(), schema);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while refreshing schema for ZNRecord:
{}", znRecord.getId(), e);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void handleDataDeleted(String path) {
+ // no-op, schema should not be deleted before the logical table config
+ }
+ }
+
+ private class ZkLogicalTableConfigChangeListener implements
IZkChildListener, IZkDataListener {
+
+ @Override
+ public synchronized void handleChildChange(String path, List<String>
logicalTableNames) {
+ if (CollectionUtils.isEmpty(logicalTableNames)) {
+ return;
+ }
+
+ // Only process new added logical tables. Changed/removed logical tables
are handled by other callbacks.
+ List<String> pathsToAdd = new ArrayList<>();
+ for (String logicalTableName : logicalTableNames) {
+ if (!_logicalTableConfigMap.containsKey(logicalTableName)) {
+ pathsToAdd.add(ZkPaths.LOGICAL_TABLE_PATH_PREFIX + logicalTableName);
+ }
+ }
+ if (!pathsToAdd.isEmpty()) {
+ addLogicalTableConfigs(pathsToAdd);
+ }
+ }
+
+ @Override
+ public synchronized void handleDataChange(String path, Object data) {
+ if (data != null) {
+ updateLogicalTableConfig((ZNRecord) data);
+ }
+ }
+
+ @Override
+ public synchronized void handleDataDeleted(String path) {
+ // NOTE: The path here is the absolute ZK path instead of the relative
path to the property store.
+ String logicalTableName = path.substring(path.lastIndexOf('/') + 1);
+ removeLogicalTableConfig(logicalTableName);
+ }
+
+ private synchronized void addLogicalTableConfigs(List<String> pathsToAdd) {
+ for (String path : pathsToAdd) {
+ ZNRecord znRecord = _propertyStore.get(path, null,
AccessOption.PERSISTENT);
+ if (znRecord != null) {
+ try {
+ LogicalTableConfig logicalTableConfig =
LogicalTableConfigUtils.fromZNRecord(znRecord);
+ String logicalTableName = logicalTableConfig.getTableName();
+
+ if (logicalTableConfig.getRefOfflineTableName() != null) {
+ addTableConfig(logicalTableConfig.getRefOfflineTableName(),
logicalTableName);
+ }
+ if (logicalTableConfig.getRefRealtimeTableName() != null) {
+ addTableConfig(logicalTableConfig.getRefRealtimeTableName(),
logicalTableName);
+ }
+
+ addSchema(logicalTableName);
+ _logicalTableConfigMap.put(logicalTableName, logicalTableConfig);
+ String logicalTableConfigPath = ZkPaths.LOGICAL_TABLE_PATH_PREFIX
+ logicalTableName;
+ _propertyStore.subscribeDataChanges(logicalTableConfigPath,
_zkLogicalTableConfigChangeListener);
+ LOGGER.info("Added the logical table config: {} in cache",
logicalTableName);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while refreshing logical table
config for ZNRecord: {}", znRecord.getId(),
+ e);
+ }
+ }
+ }
+ }
+
+ private synchronized void addTableConfig(String tableName, String
logicalTableName) {
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableName);
+ Preconditions.checkArgument(tableConfig != null, "Failed to find table
config for table: %s", tableName);
+ _tableNameToLogicalTableNamesMap.computeIfAbsent(tableName, k -> new
ArrayList<>())
+ .add(logicalTableName);
+ _tableConfigMap.put(tableName, tableConfig);
+ String path = ZkPaths.TABLE_CONFIG_PATH_PREFIX + tableName;
+ _propertyStore.subscribeDataChanges(path, _zkTableConfigChangeListener);
+ LOGGER.info("Added the table config: {} in cache for logical table: {}",
tableName, logicalTableName);
+ }
+
+ private synchronized void addSchema(String logicalTableName) {
+ Schema schema = ZKMetadataProvider.getSchema(_propertyStore,
logicalTableName);
+ Preconditions.checkArgument(schema != null,
+ "Failed to find schema for logical table: %s", logicalTableName);
+ _schemaMap.put(schema.getSchemaName(), schema);
+ String schemaPath = ZkPaths.SCHEMA_PATH_PREFIX + schema.getSchemaName();
+ _propertyStore.subscribeDataChanges(schemaPath, _zkSchemaChangeListener);
+ LOGGER.info("Added the schema: {} in cache for logical table: {}",
schema.getSchemaName(), logicalTableName);
+ }
+
+ private synchronized void updateLogicalTableConfig(ZNRecord znRecord) {
+ try {
+ LogicalTableConfig logicalTableConfig =
LogicalTableConfigUtils.fromZNRecord(znRecord);
+ String logicalTableName = logicalTableConfig.getTableName();
+ LogicalTableConfig oldLogicalTableConfig =
_logicalTableConfigMap.put(logicalTableName, logicalTableConfig);
+ Preconditions.checkArgument(oldLogicalTableConfig != null,
+ "Logical table config for logical table: %s should have been
created before", logicalTableName);
+
+ // Remove the old table configs from the table config map
+ if (oldLogicalTableConfig.getRefOfflineTableName() != null
+ &&
!oldLogicalTableConfig.getRefOfflineTableName().equals(logicalTableConfig.getRefOfflineTableName()))
{
+ removeTableConfig(oldLogicalTableConfig.getRefOfflineTableName(),
logicalTableName);
+ }
+ if (oldLogicalTableConfig.getRefRealtimeTableName() != null
+ &&
!oldLogicalTableConfig.getRefRealtimeTableName().equals(logicalTableConfig.getRefRealtimeTableName()))
{
+ removeTableConfig(oldLogicalTableConfig.getRefRealtimeTableName(),
logicalTableName);
+ }
+
+ // Add the new table configs to the table config map
+ if (logicalTableConfig.getRefOfflineTableName() != null
+ &&
!logicalTableConfig.getRefOfflineTableName().equals(oldLogicalTableConfig.getRefOfflineTableName()))
{
+ addTableConfig(logicalTableConfig.getRefOfflineTableName(),
logicalTableName);
+ }
+ if (logicalTableConfig.getRefRealtimeTableName() != null
+ &&
!logicalTableConfig.getRefRealtimeTableName().equals(oldLogicalTableConfig.getRefRealtimeTableName()))
{
+ addTableConfig(logicalTableConfig.getRefRealtimeTableName(),
logicalTableName);
+ }
+ LOGGER.info("Updated the logical table config: {} in cache",
logicalTableName);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while refreshing logical table for
ZNRecord: {}", znRecord.getId(), e);
+ }
+ }
+
+ private synchronized void removeLogicalTableConfig(String
logicalTableName) {
+ LogicalTableConfig logicalTableConfig =
_logicalTableConfigMap.remove(logicalTableName);
+ if (logicalTableConfig != null) {
+ // Remove the table configs from the table config map
+ String offlineTableName = logicalTableConfig.getRefOfflineTableName();
+ String realtimeTableName =
logicalTableConfig.getRefRealtimeTableName();
+ if (offlineTableName != null) {
+ removeTableConfig(offlineTableName, logicalTableName);
+ }
+ if (realtimeTableName != null) {
+ removeTableConfig(realtimeTableName, logicalTableName);
+ }
+ // remove schema
+ removeSchema(logicalTableConfig);
+ // Unsubscribe from the logical table config path
+ String logicalTableConfigPath = ZkPaths.LOGICAL_TABLE_PATH_PREFIX +
logicalTableName;
+ _propertyStore.unsubscribeDataChanges(logicalTableConfigPath,
_zkLogicalTableConfigChangeListener);
+ LOGGER.info("Removed the logical table config: {} from cache",
logicalTableName);
+ }
+ }
+
+ private synchronized void removeTableConfig(String tableName, String
logicalTableName) {
+ _tableNameToLogicalTableNamesMap.computeIfPresent(tableName, (k, v) -> {
+ v.remove(logicalTableName);
+ if (v.isEmpty()) {
+ _tableConfigMap.remove(tableName);
+ String path = ZkPaths.TABLE_CONFIG_PATH_PREFIX + tableName;
+ _propertyStore.unsubscribeDataChanges(path,
_zkTableConfigChangeListener);
+ LOGGER.info("Removed the table config: {} from cache", tableName);
+ return null;
+ }
+ return v;
+ });
+ }
+
+ private synchronized void removeSchema(LogicalTableConfig
logicalTableConfig) {
+ String schemaName = logicalTableConfig.getTableName();
+ _schemaMap.remove(schemaName);
+ String schemaPath = ZkPaths.SCHEMA_PATH_PREFIX + schemaName;
+ _propertyStore.unsubscribeDataChanges(schemaPath,
_zkSchemaChangeListener);
+ LOGGER.info("Removed the schema: {} from cache", schemaName);
+ }
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
index e25948fd61..b2a5ae02e1 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
@@ -52,6 +52,7 @@ import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants.Segment.BuiltInVirtualColumn;
+import org.apache.pinot.spi.utils.CommonConstants.ZkPaths;
import org.apache.pinot.spi.utils.TimestampIndexUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
@@ -70,8 +71,6 @@ public class TableCache implements PinotConfigProvider {
private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
private static final String SCHEMA_PARENT_PATH = "/SCHEMAS";
private static final String SCHEMA_PATH_PREFIX = "/SCHEMAS/";
- private static final String LOGICAL_TABLE_PARENT_PATH = "/LOGICAL/TABLE";
- private static final String LOGICAL_TABLE_PATH_PREFIX = "/LOGICAL/TABLE/";
private static final String OFFLINE_TABLE_SUFFIX = "_OFFLINE";
private static final String REALTIME_TABLE_SUFFIX = "_REALTIME";
private static final String LOWER_CASE_OFFLINE_TABLE_SUFFIX =
OFFLINE_TABLE_SUFFIX.toLowerCase();
@@ -138,12 +137,12 @@ public class TableCache implements PinotConfigProvider {
synchronized (_zkLogicalTableConfigChangeListener) {
// Subscribe child changes before reading the data to avoid missing
changes
- _propertyStore.subscribeChildChanges(LOGICAL_TABLE_PARENT_PATH,
_zkLogicalTableConfigChangeListener);
+ _propertyStore.subscribeChildChanges(ZkPaths.LOGICAL_TABLE_PARENT_PATH,
_zkLogicalTableConfigChangeListener);
- List<String> tables =
_propertyStore.getChildNames(LOGICAL_TABLE_PARENT_PATH,
AccessOption.PERSISTENT);
+ List<String> tables =
_propertyStore.getChildNames(ZkPaths.LOGICAL_TABLE_PARENT_PATH,
AccessOption.PERSISTENT);
if (CollectionUtils.isNotEmpty(tables)) {
List<String> pathsToAdd = tables.stream()
- .map(rawTableName -> LOGICAL_TABLE_PATH_PREFIX + rawTableName)
+ .map(rawTableName -> ZkPaths.LOGICAL_TABLE_PATH_PREFIX +
rawTableName)
.collect(Collectors.toList());
addLogicalTableConfigs(pathsToAdd);
}
@@ -391,7 +390,7 @@ public class TableCache implements PinotConfigProvider {
private void removeLogicalTableConfig(String path) {
_propertyStore.unsubscribeDataChanges(path,
_zkLogicalTableConfigChangeListener);
- String logicalTableName =
path.substring(LOGICAL_TABLE_PATH_PREFIX.length());
+ String logicalTableName =
path.substring(ZkPaths.LOGICAL_TABLE_PATH_PREFIX.length());
_logicalTableConfigInfoMap.remove(logicalTableName);
logicalTableName = _ignoreCase ? logicalTableName.toLowerCase() :
logicalTableName;
_logicalTableNameMap.remove(logicalTableName);
@@ -616,7 +615,7 @@ public class TableCache implements PinotConfigProvider {
List<String> pathsToAdd = new ArrayList<>();
for (String logicalTableName : logicalTableNames) {
if (!_logicalTableConfigInfoMap.containsKey(logicalTableName)) {
- pathsToAdd.add(LOGICAL_TABLE_PATH_PREFIX + logicalTableName);
+ pathsToAdd.add(ZkPaths.LOGICAL_TABLE_PATH_PREFIX + logicalTableName);
}
}
if (!pathsToAdd.isEmpty()) {
@@ -642,7 +641,7 @@ public class TableCache implements PinotConfigProvider {
public synchronized void handleDataDeleted(String path) {
// NOTE: The path here is the absolute ZK path instead of the relative
path to the property store.
String logicalTableName = path.substring(path.lastIndexOf('/') + 1);
- removeLogicalTableConfig(LOGICAL_TABLE_PATH_PREFIX + logicalTableName);
+ removeLogicalTableConfig(ZkPaths.LOGICAL_TABLE_PATH_PREFIX +
logicalTableName);
notifyLogicalTableConfigChangeListeners();
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index eff90fcf49..dfb2bbe401 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -52,6 +52,7 @@ import org.apache.pinot.spi.config.user.UserConfig;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.CommonConstants.ZkPaths;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.StringUtil;
import org.apache.pinot.spi.utils.TableConfigDecoratorRegistry;
@@ -73,7 +74,6 @@ public class ZKMetadataProvider {
private static final String PROPERTYSTORE_SEGMENTS_PREFIX = "/SEGMENTS";
private static final String PROPERTYSTORE_PAUSELESS_DEBUG_METADATA_PREFIX =
"/PAUSELESS_DEBUG_METADATA";
private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS";
- private static final String PROPERTYSTORE_LOGICAL_PREFIX = "/LOGICAL/TABLE";
private static final String PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX =
"/INSTANCE_PARTITIONS";
private static final String PROPERTYSTORE_DATABASE_CONFIGS_PREFIX =
"/CONFIGS/DATABASE";
private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX =
"/CONFIGS/TABLE";
@@ -312,7 +312,7 @@ public class ZKMetadataProvider {
}
public static String constructPropertyStorePathForLogical(String tableName) {
- return StringUtil.join("/", PROPERTYSTORE_LOGICAL_PREFIX, tableName);
+ return StringUtil.join("/", ZkPaths.LOGICAL_TABLE_PARENT_PATH, tableName);
}
public static boolean isSegmentExisted(ZkHelixPropertyStore<ZNRecord>
propertyStore, String resourceNameForResource,
@@ -850,7 +850,7 @@ public class ZKMetadataProvider {
public static List<LogicalTableConfig>
getAllLogicalTableConfigs(ZkHelixPropertyStore<ZNRecord> propertyStore) {
List<ZNRecord> znRecords =
- propertyStore.getChildren(PROPERTYSTORE_LOGICAL_PREFIX, null,
AccessOption.PERSISTENT, 0, 0);
+ propertyStore.getChildren(ZkPaths.LOGICAL_TABLE_PARENT_PATH, null,
AccessOption.PERSISTENT, 0, 0);
if (znRecords != null) {
return znRecords.stream().map(znRecord -> {
try {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java
index fc25ef4dd3..542fc9a248 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java
@@ -21,6 +21,7 @@ package org.apache.pinot.common.utils.helix;
import org.apache.helix.AccessOption;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.spi.utils.CommonConstants.ZkPaths;
public class PinotHelixPropertyStoreZnRecordProvider {
@@ -52,7 +53,7 @@ public class PinotHelixPropertyStoreZnRecordProvider {
}
public static PinotHelixPropertyStoreZnRecordProvider
forLogicalTable(ZkHelixPropertyStore<ZNRecord> propertyStore) {
- return new PinotHelixPropertyStoreZnRecordProvider(propertyStore,
"/LOGICAL/TABLE");
+ return new PinotHelixPropertyStoreZnRecordProvider(propertyStore,
ZkPaths.LOGICAL_TABLE_PARENT_PATH);
}
public ZNRecord get(String name) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
index 7969d2aa86..8acf41df87 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
@@ -198,6 +198,17 @@ public class ControllerRequestClient {
}
}
+ public void deleteLogicalTable(String logicalTableName)
+ throws IOException {
+ try {
+ HttpClient.wrapAndThrowHttpException(
+ _httpClient.sendDeleteRequest(new
URI(_controllerRequestURLBuilder.forLogicalTableDelete(logicalTableName)),
+ _headers));
+ } catch (HttpErrorStatusException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
public TableConfig getTableConfig(String tableName, TableType tableType)
throws IOException {
try {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index d7bcf53bac..21894be117 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1586,6 +1586,12 @@ public class PinotHelixResourceManager {
}
updateSchema(schema, oldSchema, forceTableSchemaUpdate);
+ if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, schemaName)) {
+ // For logical table schemas, we do not need to reload segments or send
schema refresh messages
+ LOGGER.info("Logical table schema: {} updated, no need to reload
segments or send schema refresh messages",
+ schemaName);
+ return;
+ }
try {
List<String> tableNamesWithType =
getExistingTableNamesWithType(schemaName, null);
if (reload) {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 01092a3137..55e0bb1772 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -800,6 +800,11 @@ public class ControllerTest {
getControllerRequestClient().deleteTable(TableNameBuilder.REALTIME.tableNameWithType(tableName));
}
+ public void dropLogicalTable(String logicalTableName)
+ throws IOException {
+ getControllerRequestClient().deleteLogicalTable(logicalTableName);
+ }
+
public void waitForEVToAppear(String tableNameWithType) {
TestUtils.waitForCondition(aVoid ->
_helixResourceManager.getTableExternalView(tableNameWithType) != null, 60_000L,
"Failed to create the external view for table: " + tableNameWithType);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/LogicalTableMetadataCacheTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/LogicalTableMetadataCacheTest.java
new file mode 100644
index 0000000000..db459e9aff
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/LogicalTableMetadataCacheTest.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.config.provider.LogicalTableMetadataCache;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+
+public class LogicalTableMetadataCacheTest {
+
+ private static final ControllerTest INSTANCE = ControllerTest.getInstance();
+ private static final LogicalTableMetadataCache CACHE = new
LogicalTableMetadataCache();
+
+ private final String _testTable = "testTable";
+ private final String _extraTableName = "testExtraTable";
+ private final Schema _testTableSchema =
ControllerTest.createDummySchema(_testTable);
+ private final Schema _extraTableSchema =
ControllerTest.createDummySchema(_extraTableName);
+ private final TableConfig _offlineTableConfig =
ControllerTest.createDummyTableConfig(_testTable, TableType.OFFLINE);
+ private final TableConfig _realtimeTableConfig =
+ ControllerTest.createDummyTableConfig(_testTable, TableType.REALTIME);
+ private final TableConfig _extraOfflineTableConfig =
+ ControllerTest.createDummyTableConfig(_extraTableName,
TableType.OFFLINE);
+ private final TableConfig _extraRealtimeTableConfig =
+ ControllerTest.createDummyTableConfig(_extraTableName,
TableType.REALTIME);
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ INSTANCE.setupSharedStateAndValidate();
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ INSTANCE.stopSharedTestSetup();
+ }
+
+ @BeforeMethod
+ public void beforeMethod()
+ throws IOException {
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
INSTANCE.getHelixResourceManager().getPropertyStore();
+ CACHE.init(propertyStore);
+
+ // Setup schema and table configs in the property store
+ INSTANCE.addSchema(_testTableSchema);
+ INSTANCE.addSchema(_extraTableSchema);
+ INSTANCE.addTableConfig(_offlineTableConfig);
+ INSTANCE.addTableConfig(_realtimeTableConfig);
+ INSTANCE.addTableConfig(_extraOfflineTableConfig);
+ INSTANCE.addTableConfig(_extraRealtimeTableConfig);
+
+ // Ensure the schema and table configs are not loaded into the cache yet
+ assertNull(CACHE.getSchema(_testTable));
+ assertNull(CACHE.getSchema(_extraTableName));
+ assertNull(CACHE.getTableConfig(_offlineTableConfig.getTableName()));
+ assertNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName()));
+ assertNull(CACHE.getTableConfig(_extraOfflineTableConfig.getTableName()));
+ assertNull(CACHE.getTableConfig(_extraRealtimeTableConfig.getTableName()));
+ }
+
+ @AfterMethod
+ public void afterMethod() {
+ CACHE.shutdown();
+ INSTANCE.cleanup();
+ }
+
+ @Test
+ public void testLogicalTableCacheWithUpdates()
+ throws IOException {
+ String logicalTableName = "testLogicalTable1";
+ LogicalTableConfig logicalTableConfig = addLogicalTableAndValidateCache(
+ logicalTableName, List.of(_offlineTableConfig.getTableName(),
_realtimeTableConfig.getTableName()));
+ Schema logicalTableSchema =
ControllerTest.createDummySchema(logicalTableName);
+
+ // Update logical table config and verify the cache is updated
+ Map<String, PhysicalTableConfig> physicalTableConfigMap =
logicalTableConfig.getPhysicalTableConfigMap();
+ physicalTableConfigMap.put(_extraOfflineTableConfig.getTableName(), new
PhysicalTableConfig());
+ physicalTableConfigMap.put(_extraRealtimeTableConfig.getTableName(), new
PhysicalTableConfig());
+ assertNotEquals(CACHE.getLogicalTableConfig(logicalTableName),
logicalTableConfig);
+ INSTANCE.updateLogicalTableConfig(logicalTableConfig);
+ TestUtils.waitForCondition(
+ aVoid ->
CACHE.getLogicalTableConfig(logicalTableName).equals(logicalTableConfig),
+ 10_000L, "Logical table config not updated in cache");
+ assertNull(CACHE.getTableConfig(_extraOfflineTableConfig.getTableName()));
+ assertNull(CACHE.getTableConfig(_extraRealtimeTableConfig.getTableName()));
+
+ // Update logical table schema and verify the cache is updated
+ logicalTableSchema.addField(new MetricFieldSpec("newMetric",
FieldSpec.DataType.INT));
+ assertNotEquals(CACHE.getSchema(logicalTableName), logicalTableSchema);
+ INSTANCE.updateSchema(logicalTableSchema);
+ TestUtils.waitForCondition(
+ aVoid -> CACHE.getSchema(logicalTableName).equals(logicalTableSchema),
+ 10_000L, "Logical table schema not updated in cache");
+
+ // Update offline table configs and verify the cache is updated (update
retention)
+ _offlineTableConfig.getValidationConfig().setRetentionTimeValue("10");
+ assertNotEquals(
+
Objects.requireNonNull(CACHE.getTableConfig(_offlineTableConfig.getTableName()))
+ .getValidationConfig()
+ .getRetentionTimeValue(),
+ _offlineTableConfig.getValidationConfig().getRetentionTimeValue());
+ INSTANCE.updateTableConfig(_offlineTableConfig);
+ TestUtils.waitForCondition(
+ aVoid ->
Objects.requireNonNull(CACHE.getTableConfig(_offlineTableConfig.getTableName()))
+ .getValidationConfig()
+ .getRetentionTimeValue()
+
.equals(_offlineTableConfig.getValidationConfig().getRetentionTimeValue()),
+ 10_000L, "Offline table config not updated in cache");
+
+ // Update realtime table configs and verify the cache is updated (update
retention)
+ _realtimeTableConfig.getValidationConfig().setRetentionTimeValue("20");
+ assertNotEquals(
+
Objects.requireNonNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName()))
+ .getValidationConfig()
+ .getRetentionTimeValue(),
+ _realtimeTableConfig.getValidationConfig().getRetentionTimeValue());
+ INSTANCE.updateTableConfig(_realtimeTableConfig);
+ TestUtils.waitForCondition(
+ aVoid ->
Objects.requireNonNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName()))
+ .getValidationConfig().getRetentionTimeValue()
+
.equals(_realtimeTableConfig.getValidationConfig().getRetentionTimeValue()),
+ 10_000L, "Realtime table config not updated in cache");
+
+ // Delete logical table config and verify the cache is removed
+ INSTANCE.dropLogicalTable(logicalTableName);
+ TestUtils.waitForCondition(
+ aVoid -> CACHE.getSchema(logicalTableName) == null,
+ 10_000L, "Logical table schema not removed from cache");
+ assertNull(CACHE.getLogicalTableConfig(logicalTableName));
+ assertNull(CACHE.getTableConfig(_offlineTableConfig.getTableName()));
+ assertNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName()));
+ }
+
+ @Test
+ public void testLogicalTableUpdateRefTables()
+ throws IOException {
+ String logicalTableName = "testLogicalTable2";
+ LogicalTableConfig logicalTableConfig = addLogicalTableAndValidateCache(
+ logicalTableName, List.of(_offlineTableConfig.getTableName(),
_realtimeTableConfig.getTableName()));
+
+ // Update logical table config ref tables with extra tables and verify the
cache is updated
+
logicalTableConfig.setRefOfflineTableName(_extraOfflineTableConfig.getTableName());
+
logicalTableConfig.setRefRealtimeTableName(_extraRealtimeTableConfig.getTableName());
+ logicalTableConfig.setPhysicalTableConfigMap(
+ Map.of(_extraOfflineTableConfig.getTableName(), new
PhysicalTableConfig(),
+ _extraRealtimeTableConfig.getTableName(), new
PhysicalTableConfig())
+ );
+ assertNotEquals(CACHE.getLogicalTableConfig(logicalTableName),
logicalTableConfig);
+
+ INSTANCE.updateLogicalTableConfig(logicalTableConfig);
+
+ TestUtils.waitForCondition(
+ aVoid ->
CACHE.getLogicalTableConfig(logicalTableName).equals(logicalTableConfig),
+ 10_000L, "Logical table config not updated in cache");
+
assertNotNull(CACHE.getTableConfig(_extraOfflineTableConfig.getTableName()));
+
assertNotNull(CACHE.getTableConfig(_extraRealtimeTableConfig.getTableName()));
+ assertNull(CACHE.getTableConfig(_offlineTableConfig.getTableName()));
+ assertNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName()));
+ assertNotNull(CACHE.getSchema(logicalTableName));
+ }
+
+ @Test
+ public void testCacheWithMultipleLogicalTables()
+ throws IOException {
+ String logicalTableName = "testLogicalTable3";
+ LogicalTableConfig logicalTableConfig = addLogicalTableAndValidateCache(
+ logicalTableName, List.of(_offlineTableConfig.getTableName(),
_realtimeTableConfig.getTableName()));
+
+ String otherLogicalTableName = "otherLogicalTable";
+ LogicalTableConfig otherLogicalTableConfig =
addLogicalTableAndValidateCache(
+ otherLogicalTableName, List.of(_offlineTableConfig.getTableName(),
_realtimeTableConfig.getTableName()));
+
+ // Delete one logical table config and verify the other is still present
+ INSTANCE.dropLogicalTable(logicalTableName);
+ TestUtils.waitForCondition(
+ aVoid -> CACHE.getSchema(logicalTableName) == null,
+ 10_000L, "Logical table schema not removed from cache");
+ assertNull(CACHE.getLogicalTableConfig(logicalTableName));
+ assertNotNull(CACHE.getLogicalTableConfig(otherLogicalTableName));
+ assertNotNull(CACHE.getTableConfig(_offlineTableConfig.getTableName()));
+ assertNotNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName()));
+
+ // Delete the other logical table config and verify the cache is empty
+ INSTANCE.dropLogicalTable(otherLogicalTableName);
+ TestUtils.waitForCondition(
+ aVoid -> CACHE.getSchema(otherLogicalTableName) == null,
+ 10_000L, "Logical table schema not removed from cache");
+ assertNull(CACHE.getLogicalTableConfig(otherLogicalTableName));
+ assertNull(CACHE.getTableConfig(_offlineTableConfig.getTableName()));
+ assertNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName()));
+ }
+
+ private LogicalTableConfig addLogicalTableAndValidateCache(String
logicalTableName, List<String> physicalTableNames)
+ throws IOException {
+ // Add logical table config
+ Schema logicalTableSchema =
ControllerTest.createDummySchema(logicalTableName);
+ LogicalTableConfig logicalTableConfig =
ControllerTest.getDummyLogicalTableConfig(logicalTableName,
+ physicalTableNames, "DefaultTenant");
+ INSTANCE.addSchema(logicalTableSchema);
+ INSTANCE.addLogicalTableConfig(logicalTableConfig);
+
+ // wait for the cache to be updated
+ TestUtils.waitForCondition(
+ aVoid -> CACHE.getLogicalTableConfig(logicalTableName) != null,
+ 10_000L, "Logical table config not loaded into cache");
+
+ // Verify that the logical table config is loaded into the cache
+ assertNotNull(CACHE.getSchema(logicalTableName));
+ assertEquals(CACHE.getSchema(logicalTableName), logicalTableSchema);
+ assertNotNull(CACHE.getTableConfig(_offlineTableConfig.getTableName()));
+ assertNotNull(CACHE.getTableConfig(_realtimeTableConfig.getTableName()));
+ assertNotNull(CACHE.getLogicalTableConfig(logicalTableName));
+ assertEquals(CACHE.getLogicalTableConfig(logicalTableName),
logicalTableConfig);
+
+ // verify extra schema and table configs are not loaded
+ assertNull(CACHE.getSchema(_extraTableName));
+ assertNull(CACHE.getTableConfig(_extraOfflineTableConfig.getTableName()));
+ assertNull(CACHE.getTableConfig(_extraRealtimeTableConfig.getTableName()));
+ return logicalTableConfig;
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
index 0dcbb3a9f3..414b664241 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
@@ -619,7 +619,9 @@ public abstract class BaseLogicalTableIntegrationTest
extends BaseClusterIntegra
JsonNode response = postQueryToController(query);
assertNoError(response);
- query = "SELECT count(*) FROM " + getOfflineTableNames().get(0);
+ String tableName =
+ getOfflineTableNames().isEmpty() ? getRealtimeTableNames().get(0) :
getOfflineTableNames().get(0);
+ query = "SELECT count(*) FROM " + tableName;
response = postQueryToController(query);
assertNoError(response);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index b0eb2e12d0..807a9a9fd6 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -253,10 +253,10 @@ public class ServerPlanRequestUtils {
instanceRequest.setBrokerId("unknown");
instanceRequest.setEnableTrace(executionContext.isTraceEnabled());
/*
- * If segmentList is not null, it means that the query is for a single
table and we can directly set the segments.
- * If segmentList is null, it means that the query is for a logical table
and we need to set TableSegmentInfoList
- *
- * Either one of segmentList or tableRouteInfoList has to be set, but not
both.
+ * If segmentList is not null, it means that the query is for a single
table and we can directly set the segments.
+ * If segmentList is null, it means that the query is for a logical table
and we need to set TableSegmentInfoList
+ *
+ * Either one of segmentList or tableRouteInfoList has to be set, but not
both.
*/
if (segmentList != null) {
instanceRequest.setSearchSegments(segmentList);
@@ -422,7 +422,8 @@ public class ServerPlanRequestUtils {
String logicalTableName = stageMetadata.getTableName();
LogicalTableContext logicalTableContext =
instanceDataManager.getLogicalTableContext(logicalTableName);
Preconditions.checkNotNull(logicalTableContext,
- "LogicalTableManager not found for logical table name: " +
logicalTableName);
+ String.format("LogicalTableContext not found for logical table name:
%s, query context id: %s",
+ logicalTableName, QueryThreadContext.getCid()));
Map<String, List<String>> logicalTableSegmentsMap =
executionContext.getWorkerMetadata().getLogicalTableSegmentsMap();
@@ -430,7 +431,7 @@ public class ServerPlanRequestUtils {
List<TableSegmentsInfo> realtimeTableRouteInfoList = new ArrayList<>();
Preconditions.checkNotNull(logicalTableSegmentsMap);
- for (Map.Entry<String, List<String>> entry:
logicalTableSegmentsMap.entrySet()) {
+ for (Map.Entry<String, List<String>> entry :
logicalTableSegmentsMap.entrySet()) {
String physicalTableName = entry.getKey();
TableType tableType =
TableNameBuilder.getTableTypeFromTableName(physicalTableName);
TableSegmentsInfo tableSegmentsInfo = new TableSegmentsInfo();
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 07da49bb5f..924e0ff333 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -44,6 +44,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.config.provider.LogicalTableMetadataCache;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
@@ -85,6 +86,10 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(HelixInstanceDataManager.class);
private final Map<String, TableDataManager> _tableDataManagerMap = new
ConcurrentHashMap<>();
+
+ // Logical table metadata cache to cache logical table configs, schemas, and
offline/realtime table configs.
+ private final LogicalTableMetadataCache _logicalTableMetadataCache = new
LogicalTableMetadataCache();
+
// TODO: Consider making segment locks per table instead of per instance
private final SegmentLocks _segmentLocks = new SegmentLocks();
@@ -228,6 +233,9 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
@Override
public synchronized void start() {
_propertyStore = _helixManager.getHelixPropertyStore();
+ // Initialize logical table metadata cache
+ _logicalTableMetadataCache.init(_propertyStore);
+
LOGGER.info("Helix instance data manager started");
}
@@ -255,6 +263,8 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
}
}
SegmentBuildTimeLeaseExtender.shutdownExecutor();
+ // shutdown logical table metadata cache
+ _logicalTableMetadataCache.shutdown();
LOGGER.info("Helix instance data manager shut down");
}
@@ -533,17 +543,15 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
}
}
- // TODO: LogicalTableContext has to be cached.
https://github.com/apache/pinot/issues/15859
@Nullable
@Override
public LogicalTableContext getLogicalTableContext(String logicalTableName) {
- Schema schema = ZKMetadataProvider.getSchema(getPropertyStore(),
logicalTableName);
+ Schema schema = _logicalTableMetadataCache.getSchema(logicalTableName);
if (schema == null) {
LOGGER.warn("Failed to find schema for logical table: {}, skipping",
logicalTableName);
return null;
}
- LogicalTableConfig logicalTableConfig =
ZKMetadataProvider.getLogicalTableConfig(getPropertyStore(),
- logicalTableName);
+ LogicalTableConfig logicalTableConfig =
_logicalTableMetadataCache.getLogicalTableConfig(logicalTableName);
if (logicalTableConfig == null) {
LOGGER.warn("Failed to find logical table config for logical table: {},
skipping", logicalTableName);
return null;
@@ -551,8 +559,7 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
TableConfig offlineTableConfig = null;
if (logicalTableConfig.getRefOfflineTableName() != null) {
- offlineTableConfig =
ZKMetadataProvider.getOfflineTableConfig(getPropertyStore(),
- logicalTableConfig.getRefOfflineTableName());
+ offlineTableConfig =
_logicalTableMetadataCache.getTableConfig(logicalTableConfig.getRefOfflineTableName());
if (offlineTableConfig == null) {
LOGGER.warn("Failed to find offline table config for logical table:
{}, skipping", logicalTableName);
return null;
@@ -561,8 +568,7 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
TableConfig realtimeTableConfig = null;
if (logicalTableConfig.getRefRealtimeTableName() != null) {
- realtimeTableConfig =
ZKMetadataProvider.getRealtimeTableConfig(getPropertyStore(),
- logicalTableConfig.getRefRealtimeTableName());
+ realtimeTableConfig =
_logicalTableMetadataCache.getTableConfig(logicalTableConfig.getRefRealtimeTableName());
if (realtimeTableConfig == null) {
LOGGER.warn("Failed to find realtime table config for logical table:
{}, skipping", logicalTableName);
return null;
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java
index 2c52148098..973502c543 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.spi.data;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
@@ -31,6 +32,26 @@ import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.utils.JsonUtils;
+/**
+ * Represents the configuration for a logical table in Pinot.
+ *
+ * <p>
+ * <ul>
+ * <li><b>tableName</b>: The name of the logical table.</li>
+ * <li><b>physicalTableConfigMap</b>: A map of physical table names to their
configurations.</li>
+ * <li><b>brokerTenant</b>: The tenant for the broker.</li>
+ * <li><b>queryConfig</b>: Configuration for query execution on the logical
table.</li>
+ * <li><b>quotaConfig</b>: Configuration for quota management on the logical
table.</li>
+ * <li><b>refOfflineTableName</b>: The name of the offline table whose table
config is referenced by this logical
+ * table.</li>
+ * <li><b>refRealtimeTableName</b>: The name of the realtime table whose
table config is referenced by this logical
+ * table.</li>
+ * <li><b>timeBoundaryConfig</b>: Configuration for time boundaries of the
logical table. This is used to determine
+ * the time boundaries for queries on the logical table, especially in
hybrid scenarios where both offline and
+ * realtime data are present.</li>
+ * </ul>
+ * </p>
+ */
public class LogicalTableConfig extends BaseJsonConfig {
private static final ObjectMapper DEFAULT_MAPPER = new ObjectMapper();
@@ -105,6 +126,7 @@ public class LogicalTableConfig extends BaseJsonConfig {
_quotaConfig = quotaConfig;
}
+ @Nullable
public String getRefOfflineTableName() {
return _refOfflineTableName;
}
@@ -113,6 +135,7 @@ public class LogicalTableConfig extends BaseJsonConfig {
_refOfflineTableName = refOfflineTableName;
}
+ @Nullable
public String getRefRealtimeTableName() {
return _refRealtimeTableName;
}
@@ -121,6 +144,7 @@ public class LogicalTableConfig extends BaseJsonConfig {
_refRealtimeTableName = refRealtimeTableName;
}
+ @Nullable
public TimeBoundaryConfig getTimeBoundaryConfig() {
return _timeBoundaryConfig;
}
@@ -151,6 +175,7 @@ public class LogicalTableConfig extends BaseJsonConfig {
}
}
+ @JsonIgnore
public boolean isHybridLogicalTable() {
return _refOfflineTableName != null && _refRealtimeTableName != null;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 7152d83f2b..86b8200dfc 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1719,4 +1719,14 @@ public class CommonConstants {
public static final String GROOVY_QUERY_STATIC_ANALYZER_CONFIG =
"pinot.groovy.query.static.analyzer";
public static final String GROOVY_INGESTION_STATIC_ANALYZER_CONFIG =
"pinot.groovy.ingestion.static.analyzer";
}
+
+ /**
+ * ZK paths used by Pinot.
+ */
+ public static class ZkPaths {
+ public static final String LOGICAL_TABLE_PARENT_PATH = "/LOGICAL/TABLE";
+ public static final String LOGICAL_TABLE_PATH_PREFIX = "/LOGICAL/TABLE/";
+ public static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
+ public static final String SCHEMA_PATH_PREFIX = "/SCHEMAS/";
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]