This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 980b5d04b3 Logical table schema enforcement. (#15733)
980b5d04b3 is described below
commit 980b5d04b3219c933fc18f13d3a9ed8c2fb63cdb
Author: Abhishek Bafna <[email protected]>
AuthorDate: Fri May 9 11:35:05 2025 +0530
Logical table schema enforcement. (#15733)
---
.../pinot/common/config/provider/TableCache.java | 19 -------
.../pinot/common/metadata/ZKMetadataProvider.java | 11 ++++
.../pinot/common/utils/LogicalTableUtils.java | 24 ++++++---
.../PinotHelixPropertyStoreZnRecordProvider.java | 6 ++-
.../api/resources/PinotLogicalTableResource.java | 26 ++--------
.../helix/core/PinotHelixResourceManager.java | 35 ++++++++++++-
.../resources/PinotLogicalTableResourceTest.java | 47 ++++++++++++-----
...inotUserWithAccessLogicalTableResourceTest.java | 2 +
.../pinot/controller/helix/ControllerTest.java | 2 +-
.../pinot/controller/helix/TableCacheTest.java | 60 +++++++++++++---------
10 files changed, 142 insertions(+), 90 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
index 726df76930..0b12a64282 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -279,28 +278,10 @@ public class TableCache implements PinotConfigProvider {
@Nullable
@Override
public Schema getSchema(String rawTableName) {
- if (_schemaInfoMap.containsKey(rawTableName)) {
- return getPhysicalTableSchema(rawTableName);
- } else {
- return getLogicalTableSchema(rawTableName);
- }
- }
-
- private Schema getPhysicalTableSchema(String rawTableName) {
SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
return schemaInfo != null ? schemaInfo._schema : null;
}
- @Nullable
- private Schema getLogicalTableSchema(String logicalTableName) {
- LogicalTableConfig logicalTableConfig =
getLogicalTableConfig(logicalTableName);
- if (logicalTableConfig == null) {
- return null;
- }
- Optional<String> physicalTableName =
logicalTableConfig.getPhysicalTableConfigMap().keySet().stream().findFirst();
- return
getPhysicalTableSchema(TableNameBuilder.extractRawTableName(physicalTableName.orElse(null)));
- }
-
@Override
public boolean registerSchemaChangeListener(SchemaChangeListener
schemaChangeListener) {
synchronized (_zkSchemaChangeListener) {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 04a31f4bf4..e1f21ceea4 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -642,6 +642,17 @@ public class ZKMetadataProvider {
}
}
+ /**
+ * Check if the schema exists in the property store.
+ *
+ * @param propertyStore Helix property store
+ * @param schemaName Schema name
+ * @return true if the schema exists, false otherwise
+ */
+ public static boolean isSchemaExists(ZkHelixPropertyStore<ZNRecord>
propertyStore, String schemaName) {
+ return
propertyStore.exists(constructPropertyStorePathForSchema(schemaName),
AccessOption.PERSISTENT);
+ }
+
/**
* Get the schema associated with the given table name.
*
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java
index 0b5d715bd9..be92ffdeb0 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LogicalTableUtils.java
@@ -21,11 +21,12 @@ package org.apache.pinot.common.utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
+import java.util.function.Predicate;
import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.PhysicalTableConfig;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -73,8 +74,11 @@ public class LogicalTableUtils {
return record;
}
- public static void validateLogicalTableName(LogicalTableConfig
logicalTableConfig, List<String> allPhysicalTables,
- Set<String> allBrokerTenantNames) {
+ public static void validateLogicalTableName(
+ LogicalTableConfig logicalTableConfig,
+ Predicate<String> physicalTableExistsPredicate,
+ Predicate<String> brokerTenantExistsPredicate,
+ ZkHelixPropertyStore<ZNRecord> propertyStore) {
String tableName = logicalTableConfig.getTableName();
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException("Invalid logical table name. Reason:
'tableName' should not be null or empty");
@@ -96,7 +100,7 @@ public class LogicalTableUtils {
PhysicalTableConfig physicalTableConfig = entry.getValue();
// validate physical table exists
- if (!allPhysicalTables.contains(physicalTableName)) {
+ if (!physicalTableExistsPredicate.test(physicalTableName)) {
throw new IllegalArgumentException(
"Invalid logical table. Reason: '" + physicalTableName + "' should
be one of the existing tables");
}
@@ -108,11 +112,17 @@ public class LogicalTableUtils {
}
}
- // validate broker tenant
+ // validate broker tenant exists
String brokerTenant = logicalTableConfig.getBrokerTenant();
- if (!allBrokerTenantNames.contains(brokerTenant)) {
+ if (!brokerTenantExistsPredicate.test(brokerTenant)) {
throw new IllegalArgumentException(
"Invalid logical table. Reason: '" + brokerTenant + "' should be one
of the existing broker tenants");
}
+
+ // Validate schema with same name as logical table exists
+ if (!ZKMetadataProvider.isSchemaExists(propertyStore, tableName)) {
+ throw new IllegalArgumentException(
+ "Invalid logical table. Reason: Schema with same name as logical
table '" + tableName + "' does not exist");
+ }
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java
index 7ff2d6da2c..fc25ef4dd3 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/PinotHelixPropertyStoreZnRecordProvider.java
@@ -44,13 +44,17 @@ public class PinotHelixPropertyStoreZnRecordProvider {
}
public static PinotHelixPropertyStoreZnRecordProvider
forTable(ZkHelixPropertyStore<ZNRecord> propertyStore) {
- return new PinotHelixPropertyStoreZnRecordProvider(propertyStore,
"/CONFIGS/TABLES");
+ return new PinotHelixPropertyStoreZnRecordProvider(propertyStore,
"/CONFIGS/TABLE");
}
public static PinotHelixPropertyStoreZnRecordProvider
forSegments(ZkHelixPropertyStore<ZNRecord> propertyStore) {
return new PinotHelixPropertyStoreZnRecordProvider(propertyStore,
"/SEGMENTS");
}
+ public static PinotHelixPropertyStoreZnRecordProvider
forLogicalTable(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ return new PinotHelixPropertyStoreZnRecordProvider(propertyStore,
"/LOGICAL/TABLE");
+ }
+
public ZNRecord get(String name) {
return _propertyStore.get(_pathPrefix + "/" + name, null,
AccessOption.PERSISTENT);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java
index 844d9aea24..cf9cf3b2d4 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResource.java
@@ -43,11 +43,9 @@ import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.arrow.util.Preconditions;
-import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.common.utils.DatabaseUtils;
-import org.apache.pinot.common.utils.LogicalTableUtils;
import org.apache.pinot.controller.api.access.AccessControlFactory;
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.api.access.Authenticate;
@@ -82,7 +80,6 @@ import static
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
@Path("/")
public class PinotLogicalTableResource {
public static final Logger LOGGER =
LoggerFactory.getLogger(PinotLogicalTableResource.class);
- private static final String DEFAULT_BROKER_TENANT = "DefaultTenant";
@Inject
PinotHelixResourceManager _pinotHelixResourceManager;
@@ -172,7 +169,7 @@ public class PinotLogicalTableResource {
tableName = DatabaseUtils.translateTableName(tableName, headers);
logicalTableConfig.setTableName(tableName);
- SuccessResponse successResponse = updateLogicalTable(tableName,
logicalTableConfig);
+ SuccessResponse successResponse = updateLogicalTable(logicalTableConfig);
return new ConfigSuccessResponse(successResponse.getStatus(),
logicalTableConfigAndUnrecognizedProps.getRight());
}
@@ -214,15 +211,6 @@ public class PinotLogicalTableResource {
private SuccessResponse addLogicalTable(LogicalTableConfig
logicalTableConfig) {
String tableName = logicalTableConfig.getTableName();
try {
- if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) {
- logicalTableConfig.setBrokerTenant(DEFAULT_BROKER_TENANT);
- }
-
- LogicalTableUtils.validateLogicalTableName(
- logicalTableConfig,
- _pinotHelixResourceManager.getAllTables(),
- _pinotHelixResourceManager.getAllBrokerTenantNames()
- );
_pinotHelixResourceManager.addLogicalTableConfig(logicalTableConfig);
return new SuccessResponse(tableName + " logical table successfully
added.");
} catch (TableAlreadyExistsException e) {
@@ -236,17 +224,9 @@ public class PinotLogicalTableResource {
}
}
- private SuccessResponse updateLogicalTable(String tableName,
LogicalTableConfig logicalTableConfig) {
+ private SuccessResponse updateLogicalTable(LogicalTableConfig
logicalTableConfig) {
+ String tableName = logicalTableConfig.getTableName();
try {
- if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) {
- logicalTableConfig.setBrokerTenant(DEFAULT_BROKER_TENANT);
- }
-
- LogicalTableUtils.validateLogicalTableName(
- logicalTableConfig,
- _pinotHelixResourceManager.getAllTables(),
- _pinotHelixResourceManager.getAllBrokerTenantNames()
- );
_pinotHelixResourceManager.updateLogicalTableConfig(logicalTableConfig);
return new SuccessResponse(logicalTableConfig.getTableName() + " logical
table successfully updated.");
} catch (TableNotFoundException e) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 8ff276c20b..ad62a01227 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -127,6 +127,7 @@ import org.apache.pinot.common.utils.BcryptUtils;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.LogicalTableUtils;
import org.apache.pinot.common.utils.config.AccessControlUserConfigUtils;
import org.apache.pinot.common.utils.config.InstanceUtils;
import org.apache.pinot.common.utils.config.TableConfigUtils;
@@ -1649,6 +1650,12 @@ public class PinotHelixResourceManager {
return ZKMetadataProvider.getTableSchema(_propertyStore, tableConfig);
}
+ public List<String> getAllSchemaNames() {
+ return _propertyStore.getChildNames(
+
PinotHelixPropertyStoreZnRecordProvider.forSchema(_propertyStore).getRelativePath(),
AccessOption.PERSISTENT
+ );
+ }
+
public List<String> getSchemaNames() {
return getSchemaNames(null);
}
@@ -1825,6 +1832,17 @@ public class PinotHelixResourceManager {
String tableName = logicalTableConfig.getTableName();
LOGGER.info("Adding logical table {}: Start", tableName);
+ if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) {
+ logicalTableConfig.setBrokerTenant("DefaultTenant");
+ }
+
+ LogicalTableUtils.validateLogicalTableName(
+ logicalTableConfig,
+
PinotHelixPropertyStoreZnRecordProvider.forTable(_propertyStore)::exist,
+ getAllBrokerTenantNames()::contains,
+ _propertyStore
+ );
+
// Check if the logical table name is already used
if (ZKMetadataProvider.isLogicalTableExists(_propertyStore, tableName)) {
throw new TableAlreadyExistsException("Logical table: " + tableName + "
already exists");
@@ -2102,6 +2120,17 @@ public class PinotHelixResourceManager {
String tableName = logicalTableConfig.getTableName();
LOGGER.info("Updating logical table {}: Start", tableName);
+ if (StringUtils.isEmpty(logicalTableConfig.getBrokerTenant())) {
+ logicalTableConfig.setBrokerTenant("DefaultTenant");
+ }
+
+ LogicalTableUtils.validateLogicalTableName(
+ logicalTableConfig,
+
PinotHelixPropertyStoreZnRecordProvider.forTable(_propertyStore)::exist,
+ getAllBrokerTenantNames()::contains,
+ _propertyStore
+ );
+
if (!ZKMetadataProvider.isLogicalTableExists(_propertyStore, tableName)) {
throw new TableNotFoundException("Logical table: " + tableName + " does
not exist");
}
@@ -2314,8 +2343,10 @@ public class PinotHelixResourceManager {
}
public List<String> getAllLogicalTableNames() {
- return
ZKMetadataProvider.getAllLogicalTableConfigs(_propertyStore).stream().map(LogicalTableConfig::getTableName)
- .collect(Collectors.toList());
+ List<String> logicalTableNames = _propertyStore.getChildNames(
+
PinotHelixPropertyStoreZnRecordProvider.forLogicalTable(_propertyStore).getRelativePath(),
+ AccessOption.PERSISTENT);
+ return logicalTableNames != null ? logicalTableNames :
Collections.emptyList();
}
/**
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
index 1f2d3a8204..d6df95e5cb 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotLogicalTableResourceTest.java
@@ -84,6 +84,7 @@ public class PinotLogicalTableResourceTest extends
ControllerTest {
public void testCreateUpdateDeleteLogicalTables(String logicalTableName,
List<String> physicalTableNames,
List<String> physicalTablesToUpdate)
throws IOException {
+ addDummySchema(logicalTableName);
// verify logical table does not exist
String addLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableCreate();
String getLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableGet(logicalTableName);
@@ -199,15 +200,46 @@ public class PinotLogicalTableResourceTest extends
ControllerTest {
}
}
+ @Test
+ public void testLogicalTableSchemaValidation()
+ throws IOException {
+ String addLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableCreate();
+ List<String> physicalTableNamesWithType =
createHybridTables(List.of("test_table_3"));
+
+ // Test logical table schema does not exist
+ LogicalTableConfig logicalTableConfig =
+ getDummyLogicalTableConfig(LOGICAL_TABLE_NAME,
physicalTableNamesWithType, BROKER_TENANT);
+ try {
+ ControllerTest.sendPostRequest(addLogicalTableUrl,
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+ fail("Logical Table POST request should have failed");
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("Reason: Schema with same name as
logical table '" + LOGICAL_TABLE_NAME
+ + "' does not exist"), e.getMessage());
+ }
+
+ // Test logical table with db prefix but schema without db prefix
+ addDummySchema(LOGICAL_TABLE_NAME);
+ logicalTableConfig = getDummyLogicalTableConfig("db." +
LOGICAL_TABLE_NAME, physicalTableNamesWithType,
+ BROKER_TENANT);
+ try {
+ ControllerTest.sendPostRequest(addLogicalTableUrl,
logicalTableConfig.toSingleLineJsonString(), getHeaders());
+ fail("Logical Table POST request should have failed");
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("Reason: Schema with same name as
logical table 'db." + LOGICAL_TABLE_NAME
+ + "' does not exist"), e.getMessage());
+ }
+ }
+
@Test
public void testLogicalTableWithSameNameNotAllowed()
throws IOException {
String addLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableCreate();
String getLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableGet(LOGICAL_TABLE_NAME);
- List<String> physicalTableNamesWithType =
createHybridTables(List.of("test_table_1", "test_table_2"));
+ List<String> physicalTableNamesWithType =
createHybridTables(List.of("test_table_2"));
LogicalTableConfig
logicalTableConfig = getDummyLogicalTableConfig(LOGICAL_TABLE_NAME,
physicalTableNamesWithType, BROKER_TENANT);
+ addDummySchema(LOGICAL_TABLE_NAME);
ControllerTest.sendPostRequest(addLogicalTableUrl,
logicalTableConfig.toSingleLineJsonString(), getHeaders());
verifyLogicalTableExists(getLogicalTableUrl, logicalTableConfig);
try {
@@ -217,11 +249,6 @@ public class PinotLogicalTableResourceTest extends
ControllerTest {
} catch (IOException e) {
assertTrue(e.getMessage().contains("Logical table: test_logical_table
already exists"), e.getMessage());
}
-
- // clean up the logical table
- String deleteLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableDelete(LOGICAL_TABLE_NAME);
- ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, getHeaders());
- verifyLogicalTableDoesNotExists(getLogicalTableUrl);
}
@DataProvider
@@ -270,6 +297,7 @@ public class PinotLogicalTableResourceTest extends
ControllerTest {
List<String> physicalTableNamesWithType =
createHybridTables(physicalTableNames);
for (int i = 0; i < logicalTableNames.size(); i++) {
+ addDummySchema(logicalTableNames.get(i));
LogicalTableConfig logicalTableConfig =
getDummyLogicalTableConfig(logicalTableNames.get(i), List.of(
physicalTableNamesWithType.get(2 * i),
physicalTableNamesWithType.get(2 * i + 1)), BROKER_TENANT);
@@ -280,13 +308,6 @@ public class PinotLogicalTableResourceTest extends
ControllerTest {
// verify logical table names
String getLogicalTableNamesResponse =
ControllerTest.sendGetRequest(getLogicalTableNamesUrl, getHeaders());
assertEquals(getLogicalTableNamesResponse,
objectMapper.writeValueAsString(logicalTableNames));
-
- // cleanup: delete logical tables
- for (String logicalTableName : logicalTableNames) {
- String deleteLogicalTableUrl =
_controllerRequestURLBuilder.forLogicalTableDelete(logicalTableName);
- String deleteResponse =
ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, getHeaders());
- assertEquals(deleteResponse, "{\"status\":\"" + logicalTableName + "
logical table successfully deleted.\"}");
- }
}
private void verifyLogicalTableExists(String getLogicalTableUrl,
LogicalTableConfig logicalTableConfig)
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java
index b8f0473267..62632f1a6a 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotUserWithAccessLogicalTableResourceTest.java
@@ -75,6 +75,8 @@ public class PinotUserWithAccessLogicalTableResourceTest
extends ControllerTest
addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
addFakeServerInstancesToAutoJoinHelixCluster(1, true);
_controllerRequestURLBuilder = getControllerRequestURLBuilder();
+ // create schema for logical table
+ addDummySchema(LOGICAL_TABLE_NAME);
}
@AfterMethod
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 119c774fde..6f7c06260a 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -1254,7 +1254,7 @@ public class ControllerTest {
}, 60_000L, "Failed to clean up all the external views");
// Delete all schemas.
- List<String> schemaNames = _helixResourceManager.getSchemaNames();
+ List<String> schemaNames = _helixResourceManager.getAllSchemaNames();
if (CollectionUtils.isNotEmpty(schemaNames)) {
for (String schemaName : schemaNames) {
getHelixResourceManager().deleteSchema(schemaName);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
index 9690fd6df2..a10b2f0aea 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/TableCacheTest.java
@@ -23,6 +23,8 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.exception.SchemaAlreadyExistsException;
+import org.apache.pinot.common.exception.SchemaBackwardIncompatibleException;
import org.apache.pinot.spi.config.provider.LogicalTableConfigChangeListener;
import org.apache.pinot.spi.config.provider.SchemaChangeListener;
import org.apache.pinot.spi.config.provider.TableConfigChangeListener;
@@ -77,20 +79,10 @@ public class TableCacheTest {
assertNull(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME));
// Add a schema
- Schema schema =
- new
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).addSingleValueDimension("testColumn",
DataType.INT)
- .build();
- TEST_INSTANCE.getHelixResourceManager().addSchema(schema, false, false);
- // Wait for at most 10 seconds for the callback to add the schema to the
cache
- TestUtils.waitForCondition(aVoid -> tableCache.getSchema(RAW_TABLE_NAME)
!= null, 10_000L,
- "Failed to add the schema to the cache");
+ Schema schema = addSchema(RAW_TABLE_NAME, tableCache);
// Schema can be accessed by the schema name, but not by the table name
because table config is not added yet
Schema expectedSchema = getExpectedSchema(RAW_TABLE_NAME);
- Map<String, String> expectedColumnMap = new HashMap<>();
- expectedColumnMap.put(isCaseInsensitive ? "testcolumn" : "testColumn",
"testColumn");
- expectedColumnMap.put(isCaseInsensitive ? "$docid" : "$docId", "$docId");
- expectedColumnMap.put(isCaseInsensitive ? "$hostname" : "$hostName",
"$hostName");
- expectedColumnMap.put(isCaseInsensitive ? "$segmentname" : "$segmentName",
"$segmentName");
+ Map<String, String> expectedColumnMap =
getExpectedColumnMap(isCaseInsensitive);
assertEquals(tableCache.getSchema(RAW_TABLE_NAME), expectedSchema);
assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME),
expectedColumnMap);
// Case-insensitive table name are handled based on the table config
instead of the schema
@@ -114,6 +106,7 @@ public class TableCacheTest {
assertEquals(tableCache.getColumnNameMap(RAW_TABLE_NAME),
expectedColumnMap);
// Add logical table
+ addSchema(LOGICAL_TABLE_NAME, tableCache);
LogicalTableConfig logicalTableConfig =
getLogicalTableConfig(LOGICAL_TABLE_NAME, List.of(OFFLINE_TABLE_NAME));
TEST_INSTANCE.getHelixResourceManager().addLogicalTableConfig(logicalTableConfig);
// Wait for at most 10 seconds for the callback to add the logical table
to the cache
@@ -123,12 +116,11 @@ public class TableCacheTest {
if (isCaseInsensitive) {
assertEquals(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME),
LOGICAL_TABLE_NAME);
assertEquals(tableCache.getLogicalTableConfig(MANGLED_LOGICAL_TABLE_NAME),
logicalTableConfig);
- assertEquals(tableCache.getSchema(MANGLED_LOGICAL_TABLE_NAME),
expectedSchema);
} else {
assertNull(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME));
}
assertEquals(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME),
logicalTableConfig);
- assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME), expectedSchema);
+ assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME),
getExpectedSchema(LOGICAL_TABLE_NAME));
// Register the change listeners
TestTableConfigChangeListener tableConfigChangeListener = new
TestTableConfigChangeListener();
@@ -138,8 +130,9 @@ public class TableCacheTest {
TestSchemaChangeListener schemaChangeListener = new
TestSchemaChangeListener();
assertTrue(tableCache.registerSchemaChangeListener(schemaChangeListener));
- assertEquals(schemaChangeListener._schemaList.size(), 1);
- assertEquals(schemaChangeListener._schemaList.get(0), expectedSchema);
+ assertEquals(schemaChangeListener._schemaList.size(), 2);
+ assertTrue(schemaChangeListener._schemaList.get(0).equals(expectedSchema)
+ || schemaChangeListener._schemaList.get(1).equals(expectedSchema));
TestLogicalTableConfigChangeListener logicalTableConfigChangeListener =
new TestLogicalTableConfigChangeListener();
assertTrue(tableCache.registerLogicalTableConfigChangeListener(logicalTableConfigChangeListener));
@@ -164,8 +157,9 @@ public class TableCacheTest {
expectedColumnMap.put(isCaseInsensitive ? "newcolumn" : "newColumn",
"newColumn");
TestUtils.waitForCondition(aVoid -> {
assertNotNull(tableCache.getSchema(RAW_TABLE_NAME));
- assertEquals(schemaChangeListener._schemaList.size(), 1);
- return schemaChangeListener._schemaList.get(0).equals(expectedSchema);
+ assertEquals(schemaChangeListener._schemaList.size(), 2);
+ return schemaChangeListener._schemaList.get(0).equals(expectedSchema)
+ || schemaChangeListener._schemaList.get(1).equals(expectedSchema);
}, 10_000L, "Failed to update the schema in the cache");
// Schema can be accessed by both the schema name and the raw table name
assertEquals(tableCache.getSchema(RAW_TABLE_NAME), expectedSchema);
@@ -206,12 +200,9 @@ public class TableCacheTest {
TEST_INSTANCE.waitForEVToAppear(OFFLINE_TABLE_NAME);
// Update logical table config (create schema and table config for
anotherTable)
- Schema anotherTableSchema =
- new
Schema.SchemaBuilder().setSchemaName(ANOTHER_TABLE).addSingleValueDimension("testColumn",
DataType.INT)
- .build();
+ addSchema(ANOTHER_TABLE, tableCache);
TableConfig anotherTableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(ANOTHER_TABLE_OFFLINE).build();
- TEST_INSTANCE.getHelixResourceManager().addSchema(anotherTableSchema,
false, false);
TEST_INSTANCE.getHelixResourceManager().addTable(anotherTableConfig);
TEST_INSTANCE.waitForEVToAppear(ANOTHER_TABLE_OFFLINE);
// Wait for at most 10 seconds for the callback to add the table config to
the cache
@@ -229,12 +220,11 @@ public class TableCacheTest {
if (isCaseInsensitive) {
assertEquals(tableCache.getLogicalTableConfig(MANGLED_LOGICAL_TABLE_NAME),
logicalTableConfig);
- assertEquals(tableCache.getSchema(MANGLED_LOGICAL_TABLE_NAME),
getExpectedSchema(ANOTHER_TABLE));
} else {
assertNull(tableCache.getActualLogicalTableName(MANGLED_LOGICAL_TABLE_NAME));
}
assertEquals(tableCache.getLogicalTableConfig(LOGICAL_TABLE_NAME),
logicalTableConfig);
- assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME),
getExpectedSchema(ANOTHER_TABLE));
+ assertEquals(tableCache.getSchema(LOGICAL_TABLE_NAME),
getExpectedSchema(LOGICAL_TABLE_NAME));
// Remove the table config
TEST_INSTANCE.getHelixResourceManager().deleteOfflineTable(RAW_TABLE_NAME);
@@ -254,6 +244,7 @@ public class TableCacheTest {
// Remove the schema
TEST_INSTANCE.getHelixResourceManager().deleteSchema(RAW_TABLE_NAME);
TEST_INSTANCE.getHelixResourceManager().deleteSchema(ANOTHER_TABLE);
+ TEST_INSTANCE.getHelixResourceManager().deleteSchema(LOGICAL_TABLE_NAME);
// Wait for at most 10 seconds for the callback to remove the schema from
the cache
// NOTE:
// - Verify if the callback is fully done by checking the schema change
lister because it is the last step of the
@@ -285,6 +276,27 @@ public class TableCacheTest {
TEST_INSTANCE.waitForEVToDisappear(ANOTHER_TABLE_OFFLINE);
}
+ private static Schema addSchema(String rawTableName, TableCache tableCache)
+ throws SchemaAlreadyExistsException, SchemaBackwardIncompatibleException
{
+ Schema schema =
+ new
Schema.SchemaBuilder().setSchemaName(rawTableName).addSingleValueDimension("testColumn",
DataType.INT)
+ .build();
+ TEST_INSTANCE.getHelixResourceManager().addSchema(schema, false, false);
+ // Wait for at most 10 seconds for the callback to add the schema to the
cache
+ TestUtils.waitForCondition(aVoid -> tableCache.getSchema(rawTableName) !=
null,
+ 10_000L, "Failed to add the schema to the cache");
+ return schema;
+ }
+
+ private static Map<String, String> getExpectedColumnMap(boolean
isCaseInsensitive) {
+ Map<String, String> expectedColumnMap = new HashMap<>();
+ expectedColumnMap.put(isCaseInsensitive ? "testcolumn" : "testColumn",
"testColumn");
+ expectedColumnMap.put(isCaseInsensitive ? "$docid" : "$docId", "$docId");
+ expectedColumnMap.put(isCaseInsensitive ? "$hostname" : "$hostName",
"$hostName");
+ expectedColumnMap.put(isCaseInsensitive ? "$segmentname" : "$segmentName",
"$segmentName");
+ return expectedColumnMap;
+ }
+
private static Schema getExpectedSchema(String tableName) {
return new
Schema.SchemaBuilder().setSchemaName(tableName).addSingleValueDimension("testColumn",
DataType.INT)
.addSingleValueDimension(BuiltInVirtualColumn.DOCID, DataType.INT)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]