This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 08769d2d7b Logical table CRUD operations. (#15515)
08769d2d7b is described below
commit 08769d2d7b678119ef9622b95cc5307ef7663dcf
Author: Abhishek Bafna <[email protected]>
AuthorDate: Tue May 6 14:47:46 2025 +0530
Logical table CRUD operations. (#15515)
* Logical table CRUD operations.
* Logical tables znRecord update, validations and more unit tests.
* Update test comment messages
* Code refactoring and more unit tests.
* more unit tests.
* Add logical table to table cache.
* Code refactoring to handle code duplication.
* Store logical table json using jackson.
* remove sout statement.
* Code refactoring - physical tables config as map.
* Fix checkstyle violations.
* Review comments addressed.
* Code refactoring and address review comments.
* Addressing review comments.
* Refactor LogicalTable to LogicalTableConfig.
* Refactor LogicalTable to LogicalTableConfig.
* Refactor LogicalTable to LogicalTableConfig.
* more unit tests.
---------
Co-authored-by: abhishekbafna <[email protected]>
---
.../BaseSingleStageBrokerRequestHandler.java | 7 +-
.../pinot/common/config/provider/TableCache.java | 198 ++++++++++++-
.../pinot/common/metadata/ZKMetadataProvider.java | 55 +++-
.../pinot/common/utils/LogicalTableUtils.java | 118 ++++++++
.../api/resources/PinotLogicalTableResource.java | 263 +++++++++++++++++
.../helix/core/PinotHelixResourceManager.java | 59 ++++
.../PinotAdminUserLogicalTableResourceTest.java | 51 ++++
.../resources/PinotLogicalTableResourceTest.java | 311 +++++++++++++++++++++
...inotUserWithAccessLogicalTableResourceTest.java | 173 ++++++++++++
.../pinot/controller/helix/ControllerTest.java | 52 ++++
.../pinot/controller/helix/TableCacheTest.java | 97 +++++++
.../provider/LogicalTableConfigChangeListener.java | 31 ++
.../spi/config/provider/PinotConfigProvider.java | 16 ++
.../apache/pinot/spi/data/LogicalTableConfig.java | 118 ++++++++
.../apache/pinot/spi/data/PhysicalTableConfig.java | 29 ++
.../utils/builder/ControllerRequestURLBuilder.java | 20 ++
.../utils/builder/LogicalTableConfigBuilder.java | 53 ++++
17 files changed, 1646 insertions(+), 5 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index 97510585b8..3f923ba0ef 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -1178,10 +1178,11 @@ public abstract class
BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ
@VisibleForTesting
static String getActualTableName(String tableName, TableCache tableCache) {
String actualTableName = tableCache.getActualTableName(tableName);
- if (actualTableName != null) {
- return actualTableName;
+ // If actual table name is not found for physical table, check in the
logical tables
+ if (actualTableName == null) {
+ actualTableName = tableCache.getActualLogicalTableName(tableName);
}
- return tableName;
+ return actualTableName != null ? actualTableName : tableName;
}
/**
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
index 17d953abec..726df76930 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
@@ -25,9 +25,11 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
@@ -37,8 +39,10 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.utils.LogicalTableUtils;
import org.apache.pinot.common.utils.SchemaUtils;
import org.apache.pinot.common.utils.config.TableConfigUtils;
+import org.apache.pinot.spi.config.provider.LogicalTableConfigChangeListener;
import org.apache.pinot.spi.config.provider.PinotConfigProvider;
import org.apache.pinot.spi.config.provider.SchemaChangeListener;
import org.apache.pinot.spi.config.provider.TableConfigChangeListener;
@@ -46,6 +50,7 @@ import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants.Segment.BuiltInVirtualColumn;
import org.apache.pinot.spi.utils.TimestampIndexUtils;
@@ -66,6 +71,8 @@ public class TableCache implements PinotConfigProvider {
private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
private static final String SCHEMA_PARENT_PATH = "/SCHEMAS";
private static final String SCHEMA_PATH_PREFIX = "/SCHEMAS/";
+ private static final String LOGICAL_TABLE_PARENT_PATH = "/LOGICAL/TABLE";
+ private static final String LOGICAL_TABLE_PATH_PREFIX = "/LOGICAL/TABLE/";
private static final String OFFLINE_TABLE_SUFFIX = "_OFFLINE";
private static final String REALTIME_TABLE_SUFFIX = "_REALTIME";
private static final String LOWER_CASE_OFFLINE_TABLE_SUFFIX =
OFFLINE_TABLE_SUFFIX.toLowerCase();
@@ -74,6 +81,7 @@ public class TableCache implements PinotConfigProvider {
// NOTE: No need to use concurrent set because it is always accessed within
the ZK change listener lock
private final Set<TableConfigChangeListener> _tableConfigChangeListeners =
new HashSet<>();
private final Set<SchemaChangeListener> _schemaChangeListeners = new
HashSet<>();
+ private final Set<LogicalTableConfigChangeListener>
_logicalTableConfigChangeListeners = new HashSet<>();
private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
private final boolean _ignoreCase;
@@ -89,6 +97,14 @@ public class TableCache implements PinotConfigProvider {
// Key is schema name, value is schema info
private final Map<String, SchemaInfo> _schemaInfoMap = new
ConcurrentHashMap<>();
+ private final ZkLogicalTableConfigChangeListener
+ _zkLogicalTableConfigChangeListener = new
ZkLogicalTableConfigChangeListener();
+ // Key is table name, value is logical table info
+ private final Map<String, LogicalTableConfigInfo> _logicalTableConfigInfoMap
= new ConcurrentHashMap<>();
+ // Key is lower case logical table name, value is actual logical table name
+ // For case-insensitive mode only
+ private final Map<String, String> _logicalTableNameMap = new
ConcurrentHashMap<>();
+
public TableCache(ZkHelixPropertyStore<ZNRecord> propertyStore, boolean
ignoreCase) {
_propertyStore = propertyStore;
_ignoreCase = ignoreCase;
@@ -121,6 +137,19 @@ public class TableCache implements PinotConfigProvider {
}
}
+ synchronized (_zkLogicalTableConfigChangeListener) {
+ // Subscribe child changes before reading the data to avoid missing
changes
+ _propertyStore.subscribeChildChanges(LOGICAL_TABLE_PARENT_PATH,
_zkLogicalTableConfigChangeListener);
+
+ List<String> tables =
_propertyStore.getChildNames(LOGICAL_TABLE_PARENT_PATH,
AccessOption.PERSISTENT);
+ if (CollectionUtils.isNotEmpty(tables)) {
+ List<String> pathsToAdd = tables.stream()
+ .map(rawTableName -> LOGICAL_TABLE_PATH_PREFIX + rawTableName)
+ .collect(Collectors.toList());
+ addLogicalTableConfigs(pathsToAdd);
+ }
+ }
+
LOGGER.info("Initialized TableCache with IgnoreCase: {}", ignoreCase);
}
@@ -144,6 +173,18 @@ public class TableCache implements PinotConfigProvider {
}
}
+ /**
+ * Returns the actual logical table name for the given table name, or {@code
null} if table does not exist.
+ * @param logicalTableName Logical table name
+ * @return Actual logical table name
+ */
+ @Nullable
+ public String getActualLogicalTableName(String logicalTableName) {
+ return _ignoreCase
+ ? _logicalTableNameMap.get(logicalTableName.toLowerCase())
+ : _logicalTableNameMap.get(logicalTableName);
+ }
+
/**
* Returns a map from table name to actual table name. For case-insensitive
case, the keys of the map are in lower
* case.
@@ -152,6 +193,15 @@ public class TableCache implements PinotConfigProvider {
return _tableNameMap;
}
+ /**
+ * Returns a map from logical table name to actual logical table name. For
case-insensitive case, the keys of the map
+ * are in lower case.
+ * @return Map from logical table name to actual logical table name
+ */
+ public Map<String, String> getLogicalTableNameMap() {
+ return _logicalTableNameMap;
+ }
+
/**
* Get all dimension table names.
* @return List of dimension table names
@@ -204,6 +254,14 @@ public class TableCache implements PinotConfigProvider {
return tableConfigInfo != null ? tableConfigInfo._tableConfig : null;
}
+ @Nullable
+ @Override
+ public LogicalTableConfig getLogicalTableConfig(String logicalTableName) {
+ logicalTableName = _ignoreCase ? logicalTableName.toLowerCase() :
logicalTableName;
+ LogicalTableConfigInfo logicalTableConfigInfo =
_logicalTableConfigInfoMap.get(logicalTableName);
+ return logicalTableConfigInfo != null ?
logicalTableConfigInfo._logicalTableConfig : null;
+ }
+
@Override
public boolean registerTableConfigChangeListener(TableConfigChangeListener
tableConfigChangeListener) {
synchronized (_zkTableConfigChangeListener) {
@@ -216,15 +274,33 @@ public class TableCache implements PinotConfigProvider {
}
/**
- * Returns the schema for the given table, or {@code null} if it does not
exist.
+ * Returns the schema for the given logical or physical table, or {@code
null} if it does not exist.
*/
@Nullable
@Override
public Schema getSchema(String rawTableName) {
+ if (_schemaInfoMap.containsKey(rawTableName)) {
+ return getPhysicalTableSchema(rawTableName);
+ } else {
+ return getLogicalTableSchema(rawTableName);
+ }
+ }
+
+ private Schema getPhysicalTableSchema(String rawTableName) {
SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
return schemaInfo != null ? schemaInfo._schema : null;
}
+ @Nullable
+ private Schema getLogicalTableSchema(String logicalTableName) {
+ LogicalTableConfig logicalTableConfig =
getLogicalTableConfig(logicalTableName);
+ if (logicalTableConfig == null) {
+ return null;
+ }
+ Optional<String> physicalTableName =
logicalTableConfig.getPhysicalTableConfigMap().keySet().stream().findFirst();
+ return
getPhysicalTableSchema(TableNameBuilder.extractRawTableName(physicalTableName.orElse(null)));
+ }
+
@Override
public boolean registerSchemaChangeListener(SchemaChangeListener
schemaChangeListener) {
synchronized (_zkSchemaChangeListener) {
@@ -253,6 +329,23 @@ public class TableCache implements PinotConfigProvider {
}
}
+ private void addLogicalTableConfigs(List<String> paths) {
+ // Subscribe data changes before reading the data to avoid missing changes
+ for (String path : paths) {
+ _propertyStore.subscribeDataChanges(path,
_zkLogicalTableConfigChangeListener);
+ }
+ List<ZNRecord> znRecords = _propertyStore.get(paths, null,
AccessOption.PERSISTENT);
+ for (ZNRecord znRecord : znRecords) {
+ if (znRecord != null) {
+ try {
+ putLogicalTableConfig(znRecord);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while adding logical table for
ZNRecord: {}", znRecord.getId(), e);
+ }
+ }
+ }
+ }
+
private void putTableConfig(ZNRecord znRecord)
throws IOException {
TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord);
@@ -268,6 +361,19 @@ public class TableCache implements PinotConfigProvider {
}
}
+ private void putLogicalTableConfig(ZNRecord znRecord)
+ throws IOException {
+ LogicalTableConfig logicalTableConfig =
LogicalTableUtils.fromZNRecord(znRecord);
+ String logicalTableName = logicalTableConfig.getTableName();
+ if (_ignoreCase) {
+ _logicalTableNameMap.put(logicalTableName.toLowerCase(),
logicalTableName);
+ _logicalTableConfigInfoMap.put(logicalTableName.toLowerCase(), new
LogicalTableConfigInfo(logicalTableConfig));
+ } else {
+ _logicalTableNameMap.put(logicalTableName, logicalTableName);
+ _logicalTableConfigInfoMap.put(logicalTableName, new
LogicalTableConfigInfo(logicalTableConfig));
+ }
+ }
+
private void removeTableConfig(String path) {
_propertyStore.unsubscribeDataChanges(path, _zkTableConfigChangeListener);
String tableNameWithType =
path.substring(TABLE_CONFIG_PATH_PREFIX.length());
@@ -299,6 +405,14 @@ public class TableCache implements PinotConfigProvider {
}
}
+ private void removeLogicalTableConfig(String path) {
+ _propertyStore.unsubscribeDataChanges(path,
_zkLogicalTableConfigChangeListener);
+ String logicalTableName =
path.substring(LOGICAL_TABLE_PATH_PREFIX.length());
+ logicalTableName = _ignoreCase ? logicalTableName.toLowerCase() :
logicalTableName;
+ _logicalTableConfigInfoMap.remove(logicalTableName);
+ _logicalTableNameMap.remove(logicalTableName);
+ }
+
private void addSchemas(List<String> paths) {
// Subscribe data changes before reading the data to avoid missing changes
for (String path : paths) {
@@ -365,6 +479,15 @@ public class TableCache implements PinotConfigProvider {
}
}
+ private void notifyLogicalTableConfigChangeListeners() {
+ if (!_logicalTableConfigChangeListeners.isEmpty()) {
+ List<LogicalTableConfig> logicalTableConfigs = getLogicalTableConfigs();
+ for (LogicalTableConfigChangeListener listener :
_logicalTableConfigChangeListeners) {
+ listener.onChange(logicalTableConfigs);
+ }
+ }
+ }
+
private List<TableConfig> getTableConfigs() {
List<TableConfig> tableConfigs = new
ArrayList<>(_tableConfigInfoMap.size());
for (TableConfigInfo tableConfigInfo : _tableConfigInfoMap.values()) {
@@ -373,6 +496,10 @@ public class TableCache implements PinotConfigProvider {
return tableConfigs;
}
+ public List<LogicalTableConfig> getLogicalTableConfigs() {
+ return _logicalTableConfigInfoMap.values().stream().map(o ->
o._logicalTableConfig).collect(Collectors.toList());
+ }
+
private void notifySchemaChangeListeners() {
if (!_schemaChangeListeners.isEmpty()) {
List<Schema> schemas = getSchemas();
@@ -390,6 +517,23 @@ public class TableCache implements PinotConfigProvider {
return schemas;
}
+ public boolean isLogicalTable(String logicalTableName) {
+ logicalTableName = _ignoreCase ? logicalTableName.toLowerCase() :
logicalTableName;
+ return _logicalTableConfigInfoMap.containsKey(logicalTableName);
+ }
+
+ @Override
+ public boolean registerLogicalTableConfigChangeListener(
+ LogicalTableConfigChangeListener logicalTableConfigChangeListener) {
+ synchronized (_zkLogicalTableConfigChangeListener) {
+ boolean added =
_logicalTableConfigChangeListeners.add(logicalTableConfigChangeListener);
+ if (added) {
+ logicalTableConfigChangeListener.onChange(getLogicalTableConfigs());
+ }
+ return added;
+ }
+ }
+
private class ZkTableConfigChangeListener implements IZkChildListener,
IZkDataListener {
@Override
@@ -476,6 +620,49 @@ public class TableCache implements PinotConfigProvider {
}
}
+ private class ZkLogicalTableConfigChangeListener implements
IZkChildListener, IZkDataListener {
+
+ @Override
+ public synchronized void handleChildChange(String path, List<String>
logicalTableNames) {
+ if (CollectionUtils.isEmpty(logicalTableNames)) {
+ return;
+ }
+
+ // Only process new added logical tables. Changed/removed logical tables
are handled by other callbacks.
+ List<String> pathsToAdd = new ArrayList<>();
+ for (String logicalTableName : logicalTableNames) {
+ if (!_logicalTableConfigInfoMap.containsKey(logicalTableName)) {
+ pathsToAdd.add(LOGICAL_TABLE_PATH_PREFIX + logicalTableName);
+ }
+ }
+ if (!pathsToAdd.isEmpty()) {
+ addLogicalTableConfigs(pathsToAdd);
+ }
+ notifyLogicalTableConfigChangeListeners();
+ }
+
+ @Override
+ public synchronized void handleDataChange(String path, Object data) {
+ if (data != null) {
+ ZNRecord znRecord = (ZNRecord) data;
+ try {
+ putLogicalTableConfig(znRecord);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while refreshing logical table for
ZNRecord: {}", znRecord.getId(), e);
+ }
+ notifyLogicalTableConfigChangeListeners();
+ }
+ }
+
+ @Override
+ public synchronized void handleDataDeleted(String path) {
+ // NOTE: The path here is the absolute ZK path instead of the relative
path to the property store.
+ String logicalTableName = path.substring(path.lastIndexOf('/') + 1);
+ removeLogicalTableConfig(LOGICAL_TABLE_PATH_PREFIX + logicalTableName);
+ notifyLogicalTableConfigChangeListeners();
+ }
+ }
+
private static class TableConfigInfo {
final TableConfig _tableConfig;
final Map<Expression, Expression> _expressionOverrideMap;
@@ -522,4 +709,13 @@ public class TableCache implements PinotConfigProvider {
_columnNameMap = columnNameMap;
}
}
+
+ private static class LogicalTableConfigInfo {
+ final LogicalTableConfig _logicalTableConfig;
+ // TODO : Add expression override map for logical table, issue #15607
+
+ private LogicalTableConfigInfo(LogicalTableConfig logicalTableConfig) {
+ _logicalTableConfig = logicalTableConfig;
+ }
+ }
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 096de78884..3025b1f5d9 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -18,12 +18,15 @@
*/
package org.apache.pinot.common.metadata;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
@@ -37,6 +40,7 @@ import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.LogicalTableUtils;
import org.apache.pinot.common.utils.SchemaUtils;
import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils;
import org.apache.pinot.common.utils.config.TableConfigUtils;
@@ -45,6 +49,7 @@ import org.apache.pinot.spi.config.DatabaseConfig;
import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.user.UserConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -67,6 +72,7 @@ public class ZKMetadataProvider {
private static final String PROPERTYSTORE_SEGMENTS_PREFIX = "/SEGMENTS";
private static final String PROPERTYSTORE_PAUSELESS_DEBUG_METADATA_PREFIX =
"/PAUSELESS_DEBUG_METADATA";
private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS";
+ private static final String PROPERTYSTORE_LOGICAL_PREFIX = "/LOGICAL/TABLE";
private static final String PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX =
"/INSTANCE_PARTITIONS";
private static final String PROPERTYSTORE_DATABASE_CONFIGS_PREFIX =
"/CONFIGS/DATABASE";
private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX =
"/CONFIGS/TABLE";
@@ -304,6 +310,10 @@ public class ZKMetadataProvider {
return StringUtil.join("/", PROPERTYSTORE_MINION_TASK_METADATA_PREFIX,
taskType, tableNameWithType);
}
+ public static String constructPropertyStorePathForLogical(String tableName) {
+ return StringUtil.join("/", PROPERTYSTORE_LOGICAL_PREFIX, tableName);
+ }
+
public static boolean isSegmentExisted(ZkHelixPropertyStore<ZNRecord>
propertyStore, String resourceNameForResource,
String segmentName) {
return
propertyStore.exists(constructPropertyStorePathForSegment(resourceNameForResource,
segmentName),
@@ -376,6 +386,7 @@ public class ZKMetadataProvider {
return
propertyStore.remove(constructPropertyStorePathForSegment(tableNameWithType,
segmentName),
AccessOption.PERSISTENT);
}
+
public static boolean
removePauselessDebugMetadata(ZkHelixPropertyStore<ZNRecord> propertyStore,
String tableNameWithType) {
String pauselessDebugMetadataPath =
constructPropertyStorePathForPauselessDebugMetadata(tableNameWithType);
@@ -385,7 +396,6 @@ public class ZKMetadataProvider {
return true;
}
-
@Nullable
public static ZNRecord getZnRecord(ZkHelixPropertyStore<ZNRecord>
propertyStore, String path) {
Stat stat = new Stat();
@@ -809,4 +819,47 @@ public class ZKMetadataProvider {
return result;
}
}
+
+ public static void setLogicalTableConfig(ZkHelixPropertyStore<ZNRecord>
propertyStore,
+ LogicalTableConfig logicalTableConfig) {
+ try {
+ ZNRecord znRecord = LogicalTableUtils.toZNRecord(logicalTableConfig);
+ String path =
constructPropertyStorePathForLogical(logicalTableConfig.getTableName());
+ propertyStore.set(path, znRecord, AccessOption.PERSISTENT);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Failed to convert logical table to
ZNRecord", e);
+ }
+ }
+
+ public static List<LogicalTableConfig>
getAllLogicalTableConfigs(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ List<ZNRecord> znRecords =
+ propertyStore.getChildren(PROPERTYSTORE_LOGICAL_PREFIX, null,
AccessOption.PERSISTENT, 0, 0);
+ if (znRecords != null) {
+ return znRecords.stream().map(znRecord -> {
+ try {
+ return LogicalTableUtils.fromZNRecord(znRecord);
+ } catch (IOException e) {
+ LOGGER.error("Caught exception while converting ZNRecord to
LogicalTable: {}", znRecord.getId(), e);
+ return null;
+ }
+ }).filter(Objects::nonNull).collect(Collectors.toList());
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ public static LogicalTableConfig
getLogicalTableConfig(ZkHelixPropertyStore<ZNRecord> propertyStore,
+ String tableName) {
+ try {
+ ZNRecord logicalTableZNRecord =
+ propertyStore.get(constructPropertyStorePathForLogical(tableName),
null, AccessOption.PERSISTENT);
+ if (logicalTableZNRecord == null) {
+ return null;
+ }
+ return LogicalTableUtils.fromZNRecord(logicalTableZNRecord);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while getting logical table: {}",
tableName, e);
+ return null;
+ }
+ }
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java
new file mode 100644
index 0000000000..0b5d715bd9
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+
+public class LogicalTableUtils {
+
+ private LogicalTableUtils() {
+ // Utility class
+ }
+
+ public static LogicalTableConfig fromZNRecord(ZNRecord record)
+ throws IOException {
+ LogicalTableConfigBuilder builder = new LogicalTableConfigBuilder()
+
.setTableName(record.getSimpleField(LogicalTableConfig.LOGICAL_TABLE_NAME_KEY))
+
.setBrokerTenant(record.getSimpleField(LogicalTableConfig.BROKER_TENANT_KEY));
+
+ Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
+ for (Map.Entry<String, String> entry :
record.getMapField(LogicalTableConfig.PHYSICAL_TABLE_CONFIG_KEY)
+ .entrySet()) {
+ String physicalTableName = entry.getKey();
+ String physicalTableConfigJson = entry.getValue();
+ physicalTableConfigMap.put(physicalTableName,
+ JsonUtils.stringToObject(physicalTableConfigJson,
PhysicalTableConfig.class));
+ }
+ builder.setPhysicalTableConfigMap(physicalTableConfigMap);
+ return builder.build();
+ }
+
+ public static ZNRecord toZNRecord(LogicalTableConfig logicalTableConfig)
+ throws JsonProcessingException {
+ Map<String, String> physicalTableConfigMap = new HashMap<>();
+ for (Map.Entry<String, PhysicalTableConfig> entry :
logicalTableConfig.getPhysicalTableConfigMap().entrySet()) {
+ String physicalTableName = entry.getKey();
+ PhysicalTableConfig physicalTableConfig = entry.getValue();
+ physicalTableConfigMap.put(physicalTableName,
physicalTableConfig.toJsonString());
+ }
+
+ ZNRecord record = new ZNRecord(logicalTableConfig.getTableName());
+ record.setSimpleField(LogicalTableConfig.LOGICAL_TABLE_NAME_KEY,
logicalTableConfig.getTableName());
+ record.setSimpleField(LogicalTableConfig.BROKER_TENANT_KEY,
logicalTableConfig.getBrokerTenant());
+ record.setMapField(LogicalTableConfig.PHYSICAL_TABLE_CONFIG_KEY,
physicalTableConfigMap);
+ return record;
+ }
+
+ public static void validateLogicalTableName(LogicalTableConfig
logicalTableConfig, List<String> allPhysicalTables,
+ Set<String> allBrokerTenantNames) {
+ String tableName = logicalTableConfig.getTableName();
+ if (StringUtils.isEmpty(tableName)) {
+ throw new IllegalArgumentException("Invalid logical table name. Reason:
'tableName' should not be null or empty");
+ }
+
+ if (TableNameBuilder.isOfflineTableResource(tableName) ||
TableNameBuilder.isRealtimeTableResource(tableName)) {
+ throw new IllegalArgumentException(
+ "Invalid logical table name. Reason: 'tableName' should not end with
_OFFLINE or _REALTIME");
+ }
+
+ if (logicalTableConfig.getPhysicalTableConfigMap() == null ||
logicalTableConfig.getPhysicalTableConfigMap()
+ .isEmpty()) {
+ throw new IllegalArgumentException(
+ "Invalid logical table. Reason: 'physicalTableConfigMap' should not
be null or empty");
+ }
+
+ for (Map.Entry<String, PhysicalTableConfig> entry :
logicalTableConfig.getPhysicalTableConfigMap().entrySet()) {
+ String physicalTableName = entry.getKey();
+ PhysicalTableConfig physicalTableConfig = entry.getValue();
+
+ // validate physical table exists
+ if (!allPhysicalTables.contains(physicalTableName)) {
+ throw new IllegalArgumentException(
+ "Invalid logical table. Reason: '" + physicalTableName + "' should
be one of the existing tables");
+ }
+ // validate physical table config is not null
+ if (physicalTableConfig == null) {
+ throw new IllegalArgumentException(
+ "Invalid logical table. Reason: 'physicalTableConfig' should not
be null for physical table: "
+ + physicalTableName);
+ }
+ }
+
+ // validate broker tenant
+ String brokerTenant = logicalTableConfig.getBrokerTenant();
+ if (!allBrokerTenantNames.contains(brokerTenant)) {
+ throw new IllegalArgumentException(
+ "Invalid logical table. Reason: '" + brokerTenant + "' should be one
of the existing broker tenants");
+ }
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java
new file mode 100644
index 0000000000..2e65ab9378
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.List;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.arrow.util.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.utils.DatabaseUtils;
+import org.apache.pinot.common.utils.LogicalTableUtils;
+import org.apache.pinot.controller.api.access.AccessControlFactory;
+import org.apache.pinot.controller.api.access.AccessType;
+import org.apache.pinot.controller.api.access.Authenticate;
+import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.api.exception.TableAlreadyExistsException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.auth.Actions;
+import org.apache.pinot.core.auth.Authorize;
+import org.apache.pinot.core.auth.ManualAuthorization;
+import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.glassfish.grizzly.http.server.Request;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.spi.utils.CommonConstants.DATABASE;
+import static
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+
+@Api(tags = "LogicalTable", authorizations = {
+ @Authorization(value = SWAGGER_AUTHORIZATION_KEY), @Authorization(value =
DATABASE)
+})
+@SwaggerDefinition(securityDefinition =
@SecurityDefinition(apiKeyAuthDefinitions = {
+ @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER,
+ key = SWAGGER_AUTHORIZATION_KEY,
+ description = "The format of the key is ```\"Basic <token>\" or
\"Bearer <token>\"```"),
+ @ApiKeyAuthDefinition(name = DATABASE, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = DATABASE,
+ description = "Database context passed through http header. If no
context is provided 'default' "
+ + "database context will be considered.")
+}))
+@Path("/")
+public class PinotLogicalTableResource {
+ public static final Logger LOGGER =
LoggerFactory.getLogger(PinotLogicalTableResource.class);
+ private static final String DEFAULT_BROKER_TENANT = "DefaultTenant";
+
+ @Inject
+ PinotHelixResourceManager _pinotHelixResourceManager;
+
+ @Inject
+ AccessControlFactory _accessControlFactory;
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/logicalTables")
+ @Authorize(targetType = TargetType.CLUSTER, paramName = "tableName", action
= Actions.Cluster.GET_TABLE)
+ @ApiOperation(value = "List all logical table names", notes = "Lists all
logical table names")
+ public List<String> listLogicalTableNames(@Context HttpHeaders headers) {
+ return _pinotHelixResourceManager.getAllLogicalTableNames();
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/logicalTables/{tableName}")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.GET_TABLE_CONFIG)
+ @ApiOperation(value = "Get a logical table", notes = "Gets a logical table
by name")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 404,
message = "Logical table not found"),
+ @ApiResponse(code = 500, message = "Internal error")
+ })
+ public String getLogicalTable(
+ @ApiParam(value = "Logical table name", required = true)
@PathParam("tableName") String tableName,
+ @Context HttpHeaders headers) {
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ LOGGER.info("Looking for logical table {}", tableName);
+ LogicalTableConfig logicalTableConfig =
_pinotHelixResourceManager.getLogicalTableConfig(tableName);
+ if (logicalTableConfig == null) {
+ throw new ControllerApplicationException(LOGGER, "Logical table not
found", Response.Status.NOT_FOUND);
+ }
+ return logicalTableConfig.toPrettyJsonString();
+ }
+
+ @POST
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/logicalTables")
+ @ApiOperation(value = "Add a new logical table", notes = "Adds a new logical
table")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Successfully created logical
table"), @ApiResponse(code = 409, message =
+ "Logical table already exists"), @ApiResponse(code = 400, message =
"Missing or invalid request body"),
+ @ApiResponse(code = 500, message = "Internal error")
+ })
+ @ManualAuthorization
+ public SuccessResponse addLogicalTable(
+ String logicalTableJsonString, @Context HttpHeaders httpHeaders,
+ @Context Request request) {
+ Pair<LogicalTableConfig, Map<String, Object>>
logicalTableConfigAndUnrecognizedProps =
+ getLogicalAndUnrecognizedPropertiesFromJson(logicalTableJsonString);
+ LogicalTableConfig logicalTableConfig =
logicalTableConfigAndUnrecognizedProps.getLeft();
+ String tableName =
DatabaseUtils.translateTableName(logicalTableConfig.getTableName(),
httpHeaders);
+ logicalTableConfig.setTableName(tableName);
+
+ // validate permission
+ ResourceUtils.checkPermissionAndAccess(tableName, request, httpHeaders,
AccessType.CREATE,
+ Actions.Table.CREATE_TABLE, _accessControlFactory, LOGGER);
+
+ SuccessResponse successResponse = addLogicalTable(logicalTableConfig);
+ return new ConfigSuccessResponse(successResponse.getStatus(),
logicalTableConfigAndUnrecognizedProps.getRight());
+ }
+
+ @PUT
+ @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Path("/logicalTables/{tableName}")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.UPDATE_TABLE_CONFIG)
+ @Authenticate(AccessType.UPDATE)
+ @ApiOperation(value = "Update a logical table", notes = "Updates a logical
table")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Successfully updated schema"),
@ApiResponse(code = 404, message = "Schema "
+ + "not found"), @ApiResponse(code = 400, message = "Missing or invalid
request body"), @ApiResponse(code = 500,
+ message = "Internal error")
+ })
+ public SuccessResponse updateLogicalTable(
+ @ApiParam(value = "Name of the logical table", required = true)
@PathParam("tableName") String tableName,
+ @Context HttpHeaders headers, String logicalTableJsonString) {
+ Pair<LogicalTableConfig, Map<String, Object>>
logicalTableConfigAndUnrecognizedProps =
+ getLogicalAndUnrecognizedPropertiesFromJson(logicalTableJsonString);
+ LogicalTableConfig logicalTableConfig =
logicalTableConfigAndUnrecognizedProps.getLeft();
+
+
Preconditions.checkArgument(logicalTableConfig.getTableName().equals(tableName),
+ "Logical table name in the request body should match the table name in
the URL");
+
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ logicalTableConfig.setTableName(tableName);
+
+ SuccessResponse successResponse = updateLogicalTable(tableName,
logicalTableConfig);
+ return new ConfigSuccessResponse(successResponse.getStatus(),
logicalTableConfigAndUnrecognizedProps.getRight());
+ }
+
+ @DELETE
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/logicalTables/{tableName}")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.DELETE_TABLE)
+ @Authenticate(AccessType.DELETE)
+ @ApiOperation(value = "Delete a logical table", notes = "Deletes a logical
table by name")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Successfully deleted logical
table"), @ApiResponse(code = 404, message =
+ "Logical table not found"), @ApiResponse(code = 500, message = "Error
deleting logical table")
+ })
+ public SuccessResponse deleteLogicalTable(
+ @ApiParam(value = "Logical table name", required = true)
@PathParam("tableName") String tableName,
+ @Context HttpHeaders headers) {
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ if (_pinotHelixResourceManager.deleteLogicalTable(tableName)) {
+ return new SuccessResponse(tableName + " logical table successfully
deleted.");
+ } else {
+ throw new ControllerApplicationException(LOGGER, "Failed to delete
logical table",
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ private Pair<LogicalTableConfig, Map<String, Object>>
getLogicalAndUnrecognizedPropertiesFromJson(
+ String logicalTableConfigJsonString)
+ throws ControllerApplicationException {
+ try {
+ return
JsonUtils.stringToObjectAndUnrecognizedProperties(logicalTableConfigJsonString,
LogicalTableConfig.class);
+ } catch (Exception e) {
+ String msg =
+ String.format("Invalid logical table json config: %s. Reason: %s",
logicalTableConfigJsonString,
+ e.getMessage());
+ throw new ControllerApplicationException(LOGGER, msg,
Response.Status.BAD_REQUEST, e);
+ }
+ }
+
+ private SuccessResponse addLogicalTable(LogicalTableConfig
logicalTableConfig) {
+ String tableName = logicalTableConfig.getTableName();
+ try {
+ if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) {
+ logicalTableConfig.setBrokerTenant(DEFAULT_BROKER_TENANT);
+ }
+
+ LogicalTableUtils.validateLogicalTableName(
+ logicalTableConfig,
+ _pinotHelixResourceManager.getAllTables(),
+ _pinotHelixResourceManager.getAllBrokerTenantNames()
+ );
+ _pinotHelixResourceManager.addLogicalTable(logicalTableConfig);
+ return new SuccessResponse(tableName + " logical table successfully
added.");
+ } catch (TableAlreadyExistsException e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.CONFLICT, e);
+ } catch (IllegalArgumentException e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.BAD_REQUEST, e);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER,
+ "Failed to add new logical table " + tableName + ". Reason: " +
e.getMessage(),
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
+ private SuccessResponse updateLogicalTable(String tableName,
LogicalTableConfig logicalTableConfig) {
+ try {
+ if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) {
+ logicalTableConfig.setBrokerTenant(DEFAULT_BROKER_TENANT);
+ }
+
+ LogicalTableUtils.validateLogicalTableName(
+ logicalTableConfig,
+ _pinotHelixResourceManager.getAllTables(),
+ _pinotHelixResourceManager.getAllBrokerTenantNames()
+ );
+ _pinotHelixResourceManager.updateLogicalTable(logicalTableConfig);
+ return new SuccessResponse(logicalTableConfig.getTableName() + " logical
table successfully updated.");
+ } catch (TableNotFoundException e) {
+ throw new ControllerApplicationException(LOGGER, "Failed to find logical
table " + tableName,
+ Response.Status.NOT_FOUND, e);
+ } catch (IllegalArgumentException e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.BAD_REQUEST, e);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER,
+ "Failed to update logical table " + tableName + ". Reason: " +
e.getMessage(),
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 830fd88055..9bdfb8a254 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -176,6 +176,7 @@ import org.apache.pinot.spi.config.user.ComponentType;
import org.apache.pinot.spi.config.user.RoleType;
import org.apache.pinot.spi.config.user.UserConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
@@ -2205,6 +2206,64 @@ public class PinotHelixResourceManager {
}
}
+ public void addLogicalTable(LogicalTableConfig logicalTableConfig)
+ throws TableAlreadyExistsException {
+ String tableName = logicalTableConfig.getTableName();
+ LOGGER.info("Adding logical table: {}", tableName);
+
+ // Check if the logical table name is already used
+ LogicalTableConfig existingLogicalTableConfig =
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName);
+ if (existingLogicalTableConfig != null) {
+ throw new TableAlreadyExistsException("Logical table: " + tableName + "
already exists");
+ }
+
+ // Check if the table name is already used by a physical table
+
getAllTables().stream().map(TableNameBuilder::extractRawTableName).distinct().filter(tableName::equals)
+ .findFirst().ifPresent(tableNameWithType -> {
+ throw new TableAlreadyExistsException("Table name: " + tableName + "
already exists");
+ });
+
+ ZKMetadataProvider.setLogicalTableConfig(_propertyStore,
logicalTableConfig);
+ LOGGER.info("Added logical table: {}", tableName);
+ }
+
+ public void updateLogicalTable(LogicalTableConfig logicalTableConfig)
+ throws TableNotFoundException {
+ String tableName = logicalTableConfig.getTableName();
+ LOGGER.info("Updating logical table: {}", tableName);
+
+ LogicalTableConfig oldLogicalTableConfig =
ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName);
+ if (oldLogicalTableConfig == null) {
+ throw new TableNotFoundException("Logical table: " + tableName + " does
not exist");
+ }
+
+ ZKMetadataProvider.setLogicalTableConfig(_propertyStore,
logicalTableConfig);
+ LOGGER.info("Updated logical table: {}", tableName);
+ }
+
+ public boolean deleteLogicalTable(String tableName) {
+ LOGGER.info("Deleting logical table: {}", tableName);
+ boolean result = false;
+ String propertyStorePath =
ZKMetadataProvider.constructPropertyStorePathForLogical(tableName);
+ if (_propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) {
+ result = _propertyStore.remove(propertyStorePath,
AccessOption.PERSISTENT);
+ } else {
+ throw new ControllerApplicationException(LOGGER,
+ "Logical table: " + tableName + " does not exists.",
Response.Status.NOT_FOUND);
+ }
+ LOGGER.info("Deleted logical table: {}", tableName);
+ return result;
+ }
+
+ public LogicalTableConfig getLogicalTableConfig(String tableName) {
+ return ZKMetadataProvider.getLogicalTableConfig(_propertyStore, tableName);
+ }
+
+ public List<String> getAllLogicalTableNames() {
+ return
ZKMetadataProvider.getAllLogicalTableConfigs(_propertyStore).stream().map(LogicalTableConfig::getTableName)
+ .collect(Collectors.toList());
+ }
+
/**
* Returns the ZK metdata for the given jobId and jobType
* @param jobId the id of the job
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotAdminUserLogicalTableResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotAdminUserLogicalTableResourceTest.java
new file mode 100644
index 0000000000..573e2984ef
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotAdminUserLogicalTableResourceTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import java.util.Map;
+import org.apache.pinot.controller.helix.ControllerRequestClient;
+
+
+public class PinotAdminUserLogicalTableResourceTest extends
PinotLogicalTableResourceTest {
+
+ public static final String AUTH_TOKEN = "Basic YWRtaW46dmVyeXNlY3JldA=====";
+ public static final Map<String, String> AUTH_HEADER =
Map.of("Authorization", AUTH_TOKEN);
+
+ @Override
+ protected void overrideControllerConf(Map<String, Object> properties) {
+ properties.put("controller.admin.access.control.factory.class",
+
"org.apache.pinot.controller.api.access.BasicAuthAccessControlFactory");
+ properties.put("controller.admin.access.control.principals", "admin");
+
properties.put("controller.admin.access.control.principals.admin.password",
"verysecret");
+ }
+
+ @Override
+ protected Map<String, String> getHeaders() {
+ return AUTH_HEADER;
+ }
+
+ @Override
+ public ControllerRequestClient getControllerRequestClient() {
+ if (_controllerRequestClient == null) {
+ _controllerRequestClient =
+ new ControllerRequestClient(_controllerRequestURLBuilder,
getHttpClient(), AUTH_HEADER);
+ }
+ return _controllerRequestClient;
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
new file mode 100644
index 0000000000..1f2d3a8204
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+
+public class PinotLogicalTableResourceTest extends ControllerTest {
+
+ private static final String LOGICAL_TABLE_NAME = "test_logical_table";
+ public static final String BROKER_TENANT = "DefaultTenant";
+ protected ControllerRequestURLBuilder _controllerRequestURLBuilder;
+
+ @BeforeClass
+ public void setUpClass()
+ throws Exception {
+ startZk();
+ startController();
+ addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+ addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+ _controllerRequestURLBuilder = getControllerRequestURLBuilder();
+ }
+
+ @AfterClass
+ public void tearDownClass() {
+ stopController();
+ stopZk();
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ // cleans up the physical tables after each testcase
+ cleanup();
+ }
+
+ @DataProvider
+ public Object[][] tableNamesProvider() {
+ return new Object[][]{
+ {"test_logical_table", List.of("test_table_1", "test_table_2"),
List.of("test_table_3")},
+ {"test_logical_table", List.of("test_table_1", "db.test_table_2"),
List.of("test_table_3")},
+ {"test_logical_table", List.of("test_table_1", "test_table_2"),
List.of("db.test_table_3")},
+ {"test_logical_table", List.of("db.test_table_1", "db.test_table_2"),
List.of("db.test_table_3")},
+ {"test_table", List.of("db1.test_table", "db2.test_table"),
List.of("db3.test_table")},
+ {"db0.test_table", List.of("db1.test_table", "db2.test_table"),
List.of("db3.test_table")},
+ {"db.test_logical_table", List.of("test_table_1", "test_table_2"),
List.of("test_table_3")},
+ {"db.test_logical_table", List.of("test_table_1", "db.test_table_2"),
List.of("test_table_3")},
+ {"db.test_logical_table", List.of("test_table_1", "test_table_2"),
List.of("db.test_table_3")},
+ {"db.test_logical_table", List.of("db.test_table_1",
"db.test_table_2"), List.of("db.test_table_3")},
+ };
+ }
+
+ @Test(dataProvider = "tableNamesProvider")
+ public void testCreateUpdateDeleteLogicalTables(String logicalTableName,
List<String> physicalTableNames,
+ List<String> physicalTablesToUpdate)
+ throws IOException {
+ // verify logical table does not exist
+ String addLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableCreate();
+ String getLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableGet(logicalTableName);
+ String updateLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableUpdate(logicalTableName);
+ String deleteLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableDelete(logicalTableName);
+
+ // verify logical table does not exist
+ verifyLogicalTableDoesNotExists(getLogicalTableUrl);
+
+ // setup physical and logical tables
+ List<String> physicalTableNamesWithType =
createHybridTables(physicalTableNames);
+ LogicalTableConfig
+ logicalTableConfig = getDummyLogicalTableConfig(logicalTableName,
physicalTableNamesWithType, BROKER_TENANT);
+
+ // create logical table
+ String resp =
+ ControllerTest.sendPostRequest(addLogicalTableUrl,
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+ assertEquals(resp,
+ "{\"unrecognizedProperties\":{},\"status\":\"" + logicalTableName + "
logical table successfully added.\"}");
+
+ // verify logical table
+ verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig);
+
+ // update logical table and setup new physical tables
+ List<String> tableNameToUpdateWithType =
createHybridTables(physicalTablesToUpdate);
+ tableNameToUpdateWithType.addAll(physicalTableNamesWithType);
+ logicalTableConfig = getDummyLogicalTableConfig(logicalTableName,
tableNameToUpdateWithType, BROKER_TENANT);
+
+ String response =
+ ControllerTest.sendPutRequest(updateLogicalTableUrl,
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+ assertEquals(response,
+ "{\"unrecognizedProperties\":{},\"status\":\"" + logicalTableName + "
logical table successfully updated.\"}");
+
+ // verify updated logical table
+ verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig);
+
+ // delete logical table
+ String deleteResponse =
ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, getHeaders());
+ assertEquals(deleteResponse, "{\"status\":\"" + logicalTableName + "
logical table successfully deleted.\"}");
+
+ // verify logical table is deleted
+ verifyLogicalTableDoesNotExists(getLogicalTableUrl);
+ }
+
+ @Test
+ public void testLogicalTableValidationTests()
+ throws IOException {
+ String addLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableCreate();
+
+ // create physical tables
+ List<String> physicalTableNames = List.of("test_table_1", "test_table_2");
+ List<String> physicalTableNamesWithType =
createHybridTables(physicalTableNames);
+
+ // Test logical table name with _OFFLINE and _REALTIME is not allowed
+ LogicalTableConfig logicalTableConfig =
+ getDummyLogicalTableConfig("testLogicalTable_OFFLINE",
physicalTableNamesWithType, BROKER_TENANT);
+ try {
+ ControllerTest.sendPostRequest(addLogicalTableUrl,
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+ fail("Logical Table POST request should have failed");
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("Reason: 'tableName' should not end
with _OFFLINE or _REALTIME"),
+ e.getMessage());
+ }
+
+ logicalTableConfig =
+ getDummyLogicalTableConfig("testLogicalTable_REALTIME",
physicalTableNamesWithType, BROKER_TENANT);
+ try {
+ ControllerTest.sendPostRequest(addLogicalTableUrl,
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+ fail("Logical Table POST request should have failed");
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("Reason: 'tableName' should not end
with _OFFLINE or _REALTIME"),
+ e.getMessage());
+ }
+
+ // Test logical table name can not be same as existing physical table name
+ logicalTableConfig =
+ getDummyLogicalTableConfig("test_table_1", physicalTableNamesWithType,
BROKER_TENANT);
+ try {
+ ControllerTest.sendPostRequest(addLogicalTableUrl,
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+ fail("Logical Table POST request should have failed");
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("Table name: test_table_1 already
exists"), e.getMessage());
+ }
+
+ // Test empty physical table names is not allowed
+ logicalTableConfig =
+ getDummyLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(),
BROKER_TENANT);
+ try {
+ ControllerTest.sendPostRequest(addLogicalTableUrl,
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+ fail("Logical Table POST request should have failed");
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("'physicalTableConfigMap' should not
be null or empty"), e.getMessage());
+ }
+
+ // Test all table names are physical table names and none is hybrid table
name
+ logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME,
physicalTableNames, BROKER_TENANT);
+ try {
+ ControllerTest.sendPostRequest(addLogicalTableUrl,
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+ fail("Logical Table POST request should have failed");
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("Reason: 'test_table_1' should be one
of the existing tables"),
+ e.getMessage());
+ }
+
+ // Test valid broker tenant is provided
+ logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME,
physicalTableNamesWithType, "InvalidTenant");
+ try {
+ ControllerTest.sendPostRequest(addLogicalTableUrl,
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+ fail("Logical Table POST request should have failed");
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("Reason: 'InvalidTenant' should be
one of the existing broker tenants"),
+ e.getMessage());
+ }
+ }
+
+ @Test
+ public void testLogicalTableWithSameNameNotAllowed()
+ throws IOException {
+ String addLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableCreate();
+ String getLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableGet(LOGICAL_TABLE_NAME);
+ List<String> physicalTableNamesWithType =
createHybridTables(List.of("test_table_1", "test_table_2"));
+
+ LogicalTableConfig
+ logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME,
physicalTableNamesWithType, BROKER_TENANT);
+ ControllerTest.sendPostRequest(addLogicalTableUrl,
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+ verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig);
+ try {
+ // create the same logical table again
+ ControllerTest.sendPostRequest(addLogicalTableUrl,
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+ fail("Logical Table POST request should have failed");
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("Logical table: test_logical_table
already exists"), e.getMessage());
+ }
+
+ // clean up the logical table
+ String deleteLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableDelete(LOGICAL_TABLE_NAME);
+ ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, getHeaders());
+ verifyLogicalTableDoesNotExists(getLogicalTableUrl);
+ }
+
+ @DataProvider
+ public Object[][] physicalTableShouldExistProvider() {
+ return new Object[][]{
+ {LOGICAL_TABLE_NAME, List.of("test_table_1"), "unknown_table_OFFLINE"},
+ {LOGICAL_TABLE_NAME, List.of("test_table_2"),
"unknown_table_REALTIME"},
+ {LOGICAL_TABLE_NAME, List.of("test_table_1"),
"db.test_table_1_OFFLINE"},
+ {LOGICAL_TABLE_NAME, List.of("test_table_2"),
"db.test_table_2_REALTIME"},
+ };
+ }
+
+ @Test(dataProvider = "physicalTableShouldExistProvider")
+ public void testPhysicalTableShouldExist(String logicalTableName,
List<String> physicalTableNames,
+ String unknownTableName)
+ throws IOException {
+ String addLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableCreate();
+
+ // setup physical tables
+ List<String> physicalTableNamesWithType =
createHybridTables(physicalTableNames);
+ physicalTableNamesWithType.add(unknownTableName);
+
+ // Test physical table should exist
+ LogicalTableConfig
+ logicalTableConfig = getDummyLogicalTableConfig(logicalTableName,
physicalTableNamesWithType, BROKER_TENANT);
+ try {
+ ControllerTest.sendPostRequest(addLogicalTableUrl,
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+ fail("Logical Table POST request should have failed");
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("'" + unknownTableName + "' should be
one of the existing tables"),
+ e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGetLogicalTableNames()
+ throws IOException {
+ ObjectMapper objectMapper = new ObjectMapper();
+ String getLogicalTableNamesUrl =
_controllerRequestURLBuilder.forLogicalTableNamesGet();
+ String response = ControllerTest.sendGetRequest(getLogicalTableNamesUrl,
getHeaders());
+ assertEquals(response, objectMapper.writeValueAsString(List.of()));
+
+ // setup physical tables and logical tables
+ List<String> logicalTableNames = List.of("db.test_logical_table_1",
"test_logical_table_2", "test_logical_table_3");
+ List<String> physicalTableNames = List.of("test_table_1", "test_table_2",
"db.test_table_3");
+ List<String> physicalTableNamesWithType =
createHybridTables(physicalTableNames);
+
+ for (int i = 0; i < logicalTableNames.size(); i++) {
+ LogicalTableConfig logicalTableConfig =
getDummyLogicalTableConfig(logicalTableNames.get(i), List.of(
+ physicalTableNamesWithType.get(2 * i),
physicalTableNamesWithType.get(2 * i + 1)), BROKER_TENANT);
+
+
ControllerTest.sendPostRequest(_controllerRequestURLBuilder.forLogicalTableCreate(),
+ logicalTableConfig.toSingleLineJsonString(), getHeaders());
+ }
+
+ // verify logical table names
+ String getLogicalTableNamesResponse =
ControllerTest.sendGetRequest(getLogicalTableNamesUrl, getHeaders());
+ assertEquals(getLogicalTableNamesResponse,
objectMapper.writeValueAsString(logicalTableNames));
+
+ // cleanup: delete logical tables
+ for (String logicalTableName : logicalTableNames) {
+ String deleteLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableDelete(logicalTableName);
+ String deleteResponse =
ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, getHeaders());
+ assertEquals(deleteResponse, "{\"status\":\"" + logicalTableName + "
logical table successfully deleted.\"}");
+ }
+ }
+
+ private void verifyLogicalTableExists(String getLogicalTableUrl,
LogicalTableConfig logicalTableConfig)
+ throws IOException {
+ LogicalTableConfig remoteLogicalTableConfig =
+
LogicalTableConfig.fromString(ControllerTest.sendGetRequest(getLogicalTableUrl,
getHeaders()));
+ assertEquals(remoteLogicalTableConfig, logicalTableConfig);
+ }
+
+ private void verifyLogicalTableDoesNotExists(String getLogicalTableUrl) {
+ try {
+ ControllerTest.sendGetRequest(getLogicalTableUrl, getHeaders());
+ fail("Logical Table GET request should have failed");
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("Logical table not found"),
e.getMessage());
+ }
+ }
+
+ protected Map<String, String> getHeaders() {
+ return Map.of();
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java
new file mode 100644
index 0000000000..b8f0473267
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.controller.helix.ControllerRequestClient;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+
+public class PinotUserWithAccessLogicalTableResourceTest extends
ControllerTest {
+
+ public static final String AUTH_TOKEN = "Basic YWRtaW46dmVyeXNlY3JldA=====";
+ public static final String AUTH_TOKEN_USER = "Basic dXNlcjpzZWNyZXQ==";
+ public static final Map<String, String> AUTH_HEADER =
Map.of("Authorization", AUTH_TOKEN);
+ public static final Map<String, String> AUTH_HEADER_USER =
Map.of("Authorization", AUTH_TOKEN_USER);
+ public static final String LOGICAL_TABLE_NAME = "test_logical_table";
+
+ private Map<String, Object> getControllerConf(Object permissions) {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put("controller.admin.access.control.factory.class",
+
"org.apache.pinot.controller.api.access.BasicAuthAccessControlFactory");
+ properties.put("controller.admin.access.control.principals", "admin,user");
+
properties.put("controller.admin.access.control.principals.admin.password",
"verysecret");
+ properties.put("controller.admin.access.control.principals.user.password",
"secret");
+
properties.put("controller.admin.access.control.principals.user.permissions",
permissions);
+ return properties;
+ }
+
+ protected Map<String, String> getHeaders() {
+ return AUTH_HEADER_USER;
+ }
+
+ @Override
+ public ControllerRequestClient getControllerRequestClient() {
+ if (_controllerRequestClient == null) {
+ _controllerRequestClient =
+ new ControllerRequestClient(_controllerRequestURLBuilder,
getHttpClient(), AUTH_HEADER);
+ }
+ return _controllerRequestClient;
+ }
+
+ private void setup(Map<String, Object> properties)
+ throws Exception {
+ startZk();
+ Map<String, Object> configuration = getDefaultControllerConfiguration();
+ configuration.putAll(properties);
+ startController(configuration);
+ addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+ addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+ _controllerRequestURLBuilder = getControllerRequestURLBuilder();
+ }
+
+ @AfterMethod
+ private void tearDown() {
+ cleanup();
+ String deleteLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableDelete(LOGICAL_TABLE_NAME);
+ try {
+ ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, AUTH_HEADER);
+ } catch (Exception e) {
+ // ignore
+ }
+ stopController();
+ stopZk();
+ }
+
+ @DataProvider
+ public Object[][] permissionsProvider() {
+ return new Object[][]{
+ {"read,create"},
+ {"read,create,update"},
+ {"read,create,update,delete"}
+ };
+ }
+
+ @Test(dataProvider = "permissionsProvider")
+ public void testUserWithCreateAccess(String permissions)
+ throws Exception {
+ Map<String, Object> properties = getControllerConf(permissions);
+
+ setup(properties);
+
+ String addLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableCreate();
+ String getLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableGet(LOGICAL_TABLE_NAME);
+ String updateLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableUpdate(LOGICAL_TABLE_NAME);
+ String deleteLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableDelete(LOGICAL_TABLE_NAME);
+
+ List<String> physicalTableNames = List.of("test_table_1");
+ List<String> physicalTablesWithType =
createHybridTables(physicalTableNames);
+ LogicalTableConfig logicalTableConfig;
+
+ // create logical table
+ try {
+ logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME,
physicalTablesWithType, "DefaultTenant");
+ String resp =
+ ControllerTest.sendPostRequest(addLogicalTableUrl,
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+ if (permissions.contains("create")) {
+ assertEquals(resp,
+ "{\"unrecognizedProperties\":{},\"status\":\"" + LOGICAL_TABLE_NAME
+ + " logical table successfully added.\"}");
+ verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig);
+ } else {
+ fail("Logical Table POST request should have failed");
+ }
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("Permission is denied for CREATE"),
e.getMessage());
+ }
+
+ // update logical table
+ try {
+
physicalTablesWithType.addAll(createHybridTables(List.of("test_table_2")));
+ logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME,
physicalTablesWithType, "DefaultTenant");
+ String respUpdate = ControllerTest.sendPutRequest(
+ updateLogicalTableUrl, logicalTableConfig.toSingleLineJsonString(),
getHeaders()
+ );
+ if (permissions.contains("update")) {
+ assertEquals(respUpdate,
+ "{\"unrecognizedProperties\":{},\"status\":\"" + LOGICAL_TABLE_NAME
+ + " logical table successfully updated.\"}");
+ verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig);
+ } else {
+ fail("Logical Table POST request should have failed");
+ }
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("Permission is denied for UPDATE"),
e.getMessage());
+ }
+
+ // delete logical table
+ try {
+ String respDelete =
ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, getHeaders());
+ if (permissions.contains("delete")) {
+ assertEquals(respDelete, "{\"status\":\"" + LOGICAL_TABLE_NAME + "
logical table successfully deleted.\"}");
+ } else {
+ fail("Logical Table DELETE request should have failed");
+ }
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("Permission is denied for DELETE"),
e.getMessage());
+ }
+ }
+
+ private void verifyLogicalTableExists(String logicalTableNamesGet,
LogicalTableConfig logicalTableConfig)
+ throws IOException {
+ String respGet = ControllerTest.sendGetRequest(logicalTableNamesGet,
getHeaders());
+ LogicalTableConfig remoteTable = LogicalTableConfig.fromString(respGet);
+ assertEquals(remoteTable, logicalTableConfig);
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index a5b05031c6..effcf823b7 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -69,12 +69,15 @@ import
org.apache.pinot.controller.api.access.AllowAllAccessFactory;
import org.apache.pinot.controller.api.resources.PauseStatusDetails;
import org.apache.pinot.controller.api.resources.TableViews;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -82,6 +85,8 @@ import org.apache.pinot.spi.utils.CommonConstants.Helix;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
+import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.mockito.MockedStatic;
@@ -159,6 +164,21 @@ public class ControllerTest {
return DEFAULT_INSTANCE;
}
+ public List<String> createHybridTables(List<String> tableNames)
+ throws IOException {
+ List<String> tableNamesWithType = new ArrayList<>();
+ for (String tableName : tableNames) {
+ addDummySchema(tableName);
+ TableConfig offlineTable = createDummyTableConfig(tableName,
TableType.OFFLINE);
+ TableConfig realtimeTable = createDummyTableConfig(tableName,
TableType.REALTIME);
+ addTableConfig(offlineTable);
+ addTableConfig(realtimeTable);
+ tableNamesWithType.add(offlineTable.getTableName());
+ tableNamesWithType.add(realtimeTable.getTableName());
+ }
+ return tableNamesWithType;
+ }
+
public String getHelixClusterName() {
return _clusterName;
}
@@ -369,6 +389,19 @@ public class ControllerTest {
}
}
+ public static LogicalTableConfig getDummyLogicalTableConfig(String
tableName, List<String> physicalTableNames,
+ String brokerTenant) {
+ Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
+ for (String physicalTableName : physicalTableNames) {
+ physicalTableConfigMap.put(physicalTableName, new PhysicalTableConfig());
+ }
+ LogicalTableConfigBuilder builder = new LogicalTableConfigBuilder()
+ .setTableName(tableName)
+ .setBrokerTenant(brokerTenant)
+ .setPhysicalTableConfigMap(physicalTableConfigMap);
+ return builder.build();
+ }
+
public static class FakeBrokerResourceOnlineOfflineStateModelFactory extends
StateModelFactory<StateModel> {
private static final String STATE_MODEL_DEF =
"BrokerResourceOnlineOfflineStateModel";
@@ -649,6 +682,19 @@ public class ControllerTest {
return schema;
}
+ public static TableConfig createDummyTableConfig(String tableName, TableType
tableType) {
+ TableConfigBuilder builder = new TableConfigBuilder(tableType);
+ if (tableType == TableType.REALTIME) {
+
builder.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap());
+ }
+ return builder.setTableName(tableName)
+ .setTimeColumnName("timeColumn")
+ .setTimeType("DAYS")
+ .setRetentionTimeUnit("DAYS")
+ .setRetentionTimeValue("5")
+ .build();
+ }
+
public static Schema createDummySchemaWithPrimaryKey(String tableName) {
Schema schema = createDummySchema(tableName);
schema.setPrimaryKeyColumns(Collections.singletonList("dimA"));
@@ -1184,6 +1230,12 @@ public class ControllerTest {
* test functionality.
*/
public void cleanup() {
+ // Delete logical tables
+ List<String> logicalTables =
_helixResourceManager.getAllLogicalTableNames();
+ for (String logicalTableName : logicalTables) {
+ _helixResourceManager.deleteLogicalTable(logicalTableName);
+ }
+
// Delete all tables
List<String> tables = _helixResourceManager.getAllTables();
for (String tableNameWithType : tables) {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
index 56fe2464cf..dd5e217417 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
@@ -22,14 +22,18 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.spi.config.provider.LogicalTableConfigChangeListener;
import org.apache.pinot.spi.config.provider.SchemaChangeListener;
import org.apache.pinot.spi.config.provider.TableConfigChangeListener;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants.Segment.BuiltInVirtualColumn;
+import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
@@ -44,10 +48,14 @@ import static org.testng.Assert.*;
public class TableCacheTest {
private static final ControllerTest TEST_INSTANCE =
ControllerTest.getInstance();
private static final String RAW_TABLE_NAME = "cacheTestTable";
+ private static final String ANOTHER_TABLE = "anotherTable";
+ private static final String LOGICAL_TABLE_NAME = "cacheLogicalTestTable";
private static final String OFFLINE_TABLE_NAME =
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
private static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+ private static final String ANOTHER_TABLE_OFFLINE =
TableNameBuilder.OFFLINE.tableNameWithType(ANOTHER_TABLE);
private static final String MANGLED_RAW_TABLE_NAME = "cAcHeTeStTaBlE";
+ private static final String MANGLED_LOGICAL_TABLE_NAME =
"cAcHeLoGiCaLTeStTaBlE";
private static final String MANGLED_OFFLINE_TABLE_NAME =
MANGLED_RAW_TABLE_NAME + "_oFfLiNe";
@BeforeClass
@@ -65,6 +73,7 @@ public class TableCacheTest {
assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
assertNull(tableCache.getTableConfig(OFFLINE_TABLE_NAME));
assertNull(tableCache.getActualTableName(RAW_TABLE_NAME));
+ assertNull(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME));
// Add a schema
Schema schema =
@@ -107,18 +116,43 @@ public class TableCacheTest {
assertEquals(tableCache.getSchema(RAW_TABLE_NAME), expectedSchema);
assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME),
expectedColumnMap);
+ // Add logical table
+ LogicalTableConfig logicalTableConfig =
getLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(OFFLINE_TABLE_NAME));
+
TEST_INSTANCE.getHelixResourceManager().addLogicalTable(logicalTableConfig);
+ // Wait for at most 10 seconds for the callback to add the logical table
to the cache
+ TestUtils.waitForCondition(aVoid ->
tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME) != null, 10_000L,
+ "Failed to add the logical table to the cache");
+ // Logical table can be accessed by the logical table name
+ if (isCaseInsensitive) {
+
assertEquals(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME),
LOGICAL_TABLE_NAME);
+
assertEquals(tableCache.getLogicalTableConfig(MANGLED_LOGICAL_TABLE_NAME),
logicalTableConfig);
+ assertEquals(tableCache.getSchema(MANGLED_LOGICAL_TABLE_NAME),
expectedSchema);
+ } else {
+
assertNull(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME));
+ }
+ assertEquals(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME),
logicalTableConfig);
+ assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME), expectedSchema);
+
// Register the change listeners
TestTableConfigChangeListener tableConfigChangeListener = new
TestTableConfigChangeListener();
assertTrue(tableCache.registerTableConfigChangeListener(tableConfigChangeListener));
assertEquals(tableConfigChangeListener._tableConfigList.size(), 1);
assertEquals(tableConfigChangeListener._tableConfigList.get(0),
tableConfig);
+
TestSchemaChangeListener schemaChangeListener = new
TestSchemaChangeListener();
assertTrue(tableCache.registerSchemaChangeListener(schemaChangeListener));
assertEquals(schemaChangeListener._schemaList.size(), 1);
assertEquals(schemaChangeListener._schemaList.get(0), expectedSchema);
+
+ TestLogicalTableConfigChangeListener logicalTableConfigChangeListener =
new TestLogicalTableConfigChangeListener();
+
assertTrue(tableCache.registerLogicalTableConfigChangeListener(logicalTableConfigChangeListener));
+
assertEquals(logicalTableConfigChangeListener._logicalTableConfigList.size(),
1);
+
assertEquals(logicalTableConfigChangeListener._logicalTableConfigList.get(0),
logicalTableConfig);
+
// Re-register the change listener should fail
assertFalse(tableCache.registerTableConfigChangeListener(tableConfigChangeListener));
assertFalse(tableCache.registerSchemaChangeListener(schemaChangeListener));
+
assertFalse(tableCache.registerLogicalTableConfigChangeListener(logicalTableConfigChangeListener));
// Update the schema
schema.addField(new DimensionFieldSpec("newColumn", DataType.LONG, true));
@@ -174,8 +208,34 @@ public class TableCacheTest {
// waitForEVToDisappear() call
TEST_INSTANCE.waitForEVToAppear(OFFLINE_TABLE_NAME);
+ // Update logical table config (create schema and table config for
anotherTable)
+ Schema anotherTableSchema =
+ new
Schema.SchemaBuilder().setSchemaName(ANOTHER_TABLE).addSingleValueDimension("testColumn",
DataType.INT)
+ .build();
+ TableConfig anotherTableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(ANOTHER_TABLE_OFFLINE).build();
+ TEST_INSTANCE.getHelixResourceManager().addSchema(anotherTableSchema,
false, false);
+ TEST_INSTANCE.getHelixResourceManager().addTable(anotherTableConfig);
+ TEST_INSTANCE.waitForEVToAppear(ANOTHER_TABLE_OFFLINE);
+ // Wait for at most 10 seconds for the callback to add the table config to
the cache
+ TestUtils.waitForCondition(
+ aVoid ->
anotherTableConfig.equals(tableCache.getTableConfig(ANOTHER_TABLE_OFFLINE)),
10_000L,
+ "Failed to add the table config to the cache");
+ // update the logical table
+ logicalTableConfig = getLogicalTableConfig(LOGICAL_TABLE_NAME,
List.of(OFFLINE_TABLE_NAME, ANOTHER_TABLE_OFFLINE));
+
TEST_INSTANCE.getHelixResourceManager().updateLogicalTable(logicalTableConfig);
+ if (isCaseInsensitive) {
+
assertEquals(tableCache.getLogicalTableConfig(MANGLED_LOGICAL_TABLE_NAME),
logicalTableConfig);
+ assertEquals(tableCache.getSchema(MANGLED_LOGICAL_TABLE_NAME),
expectedSchema);
+ } else {
+
assertNull(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME));
+ }
+ assertEquals(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME),
logicalTableConfig);
+ assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME), expectedSchema);
+
// Remove the table config
TEST_INSTANCE.getHelixResourceManager().deleteOfflineTable(RAW_TABLE_NAME);
+
TEST_INSTANCE.getHelixResourceManager().deleteOfflineTable(ANOTHER_TABLE_OFFLINE);
// Wait for at most 10 seconds for the callback to remove the table config
from the cache
// NOTE:
// - Verify if the callback is fully done by checking the table config
change lister because it is the last step of
@@ -183,27 +243,55 @@ public class TableCacheTest {
TestUtils.waitForCondition(aVoid ->
tableConfigChangeListener._tableConfigList.isEmpty(), 10_000L,
"Failed to remove the table config from the cache");
assertNull(tableCache.getTableConfig(OFFLINE_TABLE_NAME));
+ assertNull(tableCache.getTableConfig(ANOTHER_TABLE_OFFLINE));
assertNull(tableCache.getActualTableName(RAW_TABLE_NAME));
assertEquals(tableCache.getSchema(RAW_TABLE_NAME), expectedSchema);
assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME),
expectedColumnMap);
// Remove the schema
TEST_INSTANCE.getHelixResourceManager().deleteSchema(RAW_TABLE_NAME);
+ TEST_INSTANCE.getHelixResourceManager().deleteSchema(ANOTHER_TABLE);
// Wait for at most 10 seconds for the callback to remove the schema from
the cache
// NOTE:
// - Verify if the callback is fully done by checking the schema change
lister because it is the last step of the
// callback handling
TestUtils.waitForCondition(aVoid ->
schemaChangeListener._schemaList.isEmpty(), 10_000L,
"Failed to remove the schema from the cache");
+
+ // Remove logical table
+
TEST_INSTANCE.getHelixResourceManager().deleteLogicalTable(LOGICAL_TABLE_NAME);
+ // Wait for at most 10 seconds for the callback to remove the logical
table from the cache
+ // NOTE:
+ // - Verify if the callback is fully done by checking the logical table
change lister because it is the last step of
+ // the callback handling
+ TestUtils.waitForCondition(aVoid ->
logicalTableConfigChangeListener._logicalTableConfigList.isEmpty(), 10_000L,
+ "Failed to remove the logical table from the cache");
+
assertNull(tableCache.getSchema(RAW_TABLE_NAME));
+ assertNull(tableCache.getSchema(ANOTHER_TABLE));
assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
assertNull(tableCache.getSchema(RAW_TABLE_NAME));
assertNull(tableCache.getColumnNameMap(RAW_TABLE_NAME));
+ assertNull(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME));
assertEquals(schemaChangeListener._schemaList.size(), 0);
assertEquals(tableConfigChangeListener._tableConfigList.size(), 0);
+
assertEquals(logicalTableConfigChangeListener._logicalTableConfigList.size(),
0);
// Wait for external view to disappear to ensure a clean start for the
next test
TEST_INSTANCE.waitForEVToDisappear(OFFLINE_TABLE_NAME);
+ TEST_INSTANCE.waitForEVToDisappear(ANOTHER_TABLE_OFFLINE);
+ }
+
+ private static LogicalTableConfig getLogicalTableConfig(String tableName,
List<String> physicalTableNames) {
+ Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
+ for (String physicalTableName : physicalTableNames) {
+ physicalTableConfigMap.put(physicalTableName, new PhysicalTableConfig());
+ }
+ LogicalTableConfigBuilder builder = new LogicalTableConfigBuilder()
+ .setTableName(tableName)
+ .setBrokerTenant("DefaultTenant")
+ .setPhysicalTableConfigMap(physicalTableConfigMap);
+ return builder.build();
}
@DataProvider(name = "testTableCacheDataProvider")
@@ -229,6 +317,15 @@ public class TableCacheTest {
}
}
+ private static class TestLogicalTableConfigChangeListener implements
LogicalTableConfigChangeListener {
+ private volatile List<LogicalTableConfig> _logicalTableConfigList;
+
+ @Override
+ public void onChange(List<LogicalTableConfig> logicalTableConfigList) {
+ _logicalTableConfigList = logicalTableConfigList;
+ }
+ }
+
@AfterClass
public void tearDown() {
TEST_INSTANCE.cleanup();
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/LogicalTableConfigChangeListener.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/LogicalTableConfigChangeListener.java
new file mode 100644
index 0000000000..adf4b990db
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/LogicalTableConfigChangeListener.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.provider;
+
+import java.util.List;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+
+
+public interface LogicalTableConfigChangeListener {
+ /**
+ * The callback to be invoked on logical table changes
+ * @param logicalTableConfigList the entire list of logical tables in the
cluster
+ */
+ void onChange(List<LogicalTableConfig> logicalTableConfigList);
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/PinotConfigProvider.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/PinotConfigProvider.java
index 0400fe0498..64de5ada56 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/PinotConfigProvider.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/provider/PinotConfigProvider.java
@@ -20,6 +20,7 @@ package org.apache.pinot.spi.config.provider;
import java.util.List;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
@@ -57,4 +58,19 @@ public interface PinotConfigProvider {
* registered.
*/
boolean registerSchemaChangeListener(SchemaChangeListener
schemaChangeListener);
+
+ /**
+ * Returns the logical table config for the given logical table name.
+ * @param logicalTableName the name of the logical table
+ * @return the logical table
+ */
+ LogicalTableConfig getLogicalTableConfig(String logicalTableName);
+
+ /**
+ * Registers the {@link LogicalTableConfigChangeListener} and notifies it
whenever any changes (addition, update,
+ * @param logicalTableConfigChangeListener the listener to be registered
+ * @return {@code true} if the listener is successfully registered, {@code
false} if the listener is already
+ * registered.
+ */
+ boolean
registerLogicalTableConfigChangeListener(LogicalTableConfigChangeListener
logicalTableConfigChangeListener);
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java
new file mode 100644
index 0000000000..4d477691d1
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/LogicalTableConfig.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class LogicalTableConfig extends BaseJsonConfig {
+
+ private static final ObjectMapper DEFAULT_MAPPER = new ObjectMapper();
+
+ public static final String LOGICAL_TABLE_NAME_KEY = "tableName";
+ public static final String PHYSICAL_TABLE_CONFIG_KEY =
"physicalTableConfigMap";
+ public static final String BROKER_TENANT_KEY = "brokerTenant";
+
+ private String _tableName;
+ private String _brokerTenant;
+ private Map<String, PhysicalTableConfig> _physicalTableConfigMap;
+
+ public static LogicalTableConfig fromString(String logicalTableString)
+ throws IOException {
+ return JsonUtils.stringToObject(logicalTableString,
LogicalTableConfig.class);
+ }
+
+ public String getTableName() {
+ return _tableName;
+ }
+
+ public void setTableName(String tableName) {
+ _tableName = tableName;
+ }
+
+ public Map<String, PhysicalTableConfig> getPhysicalTableConfigMap() {
+ return _physicalTableConfigMap;
+ }
+
+ public void setPhysicalTableConfigMap(
+ Map<String, PhysicalTableConfig> physicalTableConfigMap) {
+ _physicalTableConfigMap = physicalTableConfigMap;
+ }
+
+ public String getBrokerTenant() {
+ return _brokerTenant;
+ }
+
+ public void setBrokerTenant(String brokerTenant) {
+ _brokerTenant = brokerTenant;
+ }
+
+ private JsonNode toJsonObject() {
+ return DEFAULT_MAPPER.valueToTree(this);
+ }
+
+ /**
+ * Returns a single-line json string representation of the schema.
+ */
+ public String toSingleLineJsonString() {
+ return toJsonObject().toString();
+ }
+
+ /**
+ * Returns a pretty json string representation of the schema.
+ */
+ public String toPrettyJsonString() {
+ try {
+ return JsonUtils.objectToPrettyString(toJsonObject());
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+ LogicalTableConfig that = (LogicalTableConfig) object;
+ return Objects.equals(getTableName(), that.getTableName());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getTableName());
+ }
+
+ @Override
+ public String toString() {
+ return toSingleLineJsonString();
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/PhysicalTableConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/PhysicalTableConfig.java
new file mode 100644
index 0000000000..c86fcf97dc
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/PhysicalTableConfig.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data;
+
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+
+/**
+ * This class represents the configuration for a physical table in {@link
LogicalTableConfig}.
+ * This is empty by design and more docs would be added as features are added.
+ */
+public class PhysicalTableConfig extends BaseJsonConfig {
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 2804eac53e..eb1b7d3e17 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -633,4 +633,24 @@ public class ControllerRequestURLBuilder {
public String forIdealState(String tableName) {
return StringUtil.join("/", _baseUrl, "tables", tableName, "idealstate");
}
+
+ public String forLogicalTableCreate() {
+ return StringUtil.join("/", _baseUrl, "logicalTables");
+ }
+
+ public String forLogicalTableUpdate(String logicalTableName) {
+ return StringUtil.join("/", _baseUrl, "logicalTables", logicalTableName);
+ }
+
+ public String forLogicalTableGet(String logicalTableName) {
+ return StringUtil.join("/", _baseUrl, "logicalTables", logicalTableName);
+ }
+
+ public String forLogicalTableNamesGet() {
+ return StringUtil.join("/", _baseUrl, "logicalTables");
+ }
+
+ public String forLogicalTableDelete(String logicalTableName) {
+ return StringUtil.join("/", _baseUrl, "logicalTables", logicalTableName);
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java
new file mode 100644
index 0000000000..eff47c5af6
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/LogicalTableConfigBuilder.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils.builder;
+
+import java.util.Map;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
+
+
+public class LogicalTableConfigBuilder {
+ private String _tableName;
+ private Map<String, PhysicalTableConfig> _physicalTableConfigMap;
+ private String _brokerTenant;
+
+ public LogicalTableConfigBuilder setTableName(String tableName) {
+ _tableName = tableName;
+ return this;
+ }
+
+ public LogicalTableConfigBuilder setPhysicalTableConfigMap(Map<String,
PhysicalTableConfig> physicalTableConfigMap) {
+ _physicalTableConfigMap = physicalTableConfigMap;
+ return this;
+ }
+
+ public LogicalTableConfigBuilder setBrokerTenant(String brokerTenant) {
+ _brokerTenant = brokerTenant;
+ return this;
+ }
+
+ public LogicalTableConfig build() {
+ LogicalTableConfig logicalTableConfig = new LogicalTableConfig();
+ logicalTableConfig.setTableName(_tableName);
+ logicalTableConfig.setPhysicalTableConfigMap(_physicalTableConfigMap);
+ logicalTableConfig.setBrokerTenant(_brokerTenant);
+ return logicalTableConfig;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]