This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ab4f333de8 Enforce schema for all tables (#15333)
ab4f333de8 is described below
commit ab4f333de8ddce6bd1349570472943f96dac7c7e
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Mar 25 16:15:36 2025 -0600
Enforce schema for all tables (#15333)
---
.../segmentpruner/SegmentPrunerFactory.java | 2 +-
.../pinot/common/config/provider/TableCache.java | 36 +--
.../pinot/common/metadata/ZKMetadataProvider.java | 51 +---
.../pinot/common/metrics/ControllerGauge.java | 15 +-
.../org/apache/pinot/common/utils/SchemaUtils.java | 14 +-
.../pinot/controller/BaseControllerStarter.java | 118 +++------
.../apache/pinot/controller/ControllerConf.java | 17 +-
.../api/resources/PinotSchemaRestletResource.java | 43 ++-
.../api/resources/PinotTableRestletResource.java | 48 +---
.../api/resources/TableAndSchemaConfig.java | 4 +-
.../controller/helix/ControllerRequestClient.java | 11 +
.../helix/core/PinotHelixResourceManager.java | 15 +-
.../pinot/controller/helix/ControllerTest.java | 5 +
.../cleanup/SchemaCleanupTaskStatelessTest.java | 288 ---------------------
.../core/data/manager/BaseTableDataManager.java | 10 +-
.../tests/OfflineClusterIntegrationTest.java | 5 +-
...RefreshSegmentMinionClusterIntegrationTest.java | 11 +-
.../RefreshSegmentTaskGenerator.java | 4 +-
...RealtimeToOfflineSegmentsTaskGeneratorTest.java | 3 -
.../local/data/manager/TableDataManager.java | 2 +-
.../segment/local/utils/TableConfigUtils.java | 105 +++-----
.../segment/local/utils/TableConfigUtilsTest.java | 58 +++--
.../server/predownload/PredownloadZKClient.java | 16 +-
.../SegmentsValidationAndRetentionConfig.java | 15 --
24 files changed, 206 insertions(+), 690 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
index 9efd86764e..6fc8a150c1 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
@@ -133,7 +133,7 @@ public class SegmentPrunerFactory {
LOGGER.warn("Cannot enable time range pruning without time column for
table: {}", tableNameWithType);
return null;
}
- Schema schema = ZKMetadataProvider.getTableSchema(propertyStore,
tableConfig);
+ Schema schema = ZKMetadataProvider.getTableSchema(propertyStore,
tableNameWithType);
if (schema == null) {
LOGGER.warn("Cannot enable time range pruning without schema for table:
{}", tableNameWithType);
return null;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
index a193fc19db..17d953abec 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java
@@ -81,9 +81,6 @@ public class TableCache implements PinotConfigProvider {
private final ZkTableConfigChangeListener _zkTableConfigChangeListener = new
ZkTableConfigChangeListener();
// Key is table name with type suffix, value is table config info
private final Map<String, TableConfigInfo> _tableConfigInfoMap = new
ConcurrentHashMap<>();
- // Key is table name (with or without type suffix), value is schema name
- // It only stores table with schema name not matching the raw table name
- private final Map<String, String> _schemaNameMap = new ConcurrentHashMap<>();
// Key is lower case table name (with or without type suffix), value is
actual table name
// For case-insensitive mode only
private final Map<String, String> _tableNameMap = new ConcurrentHashMap<>();
@@ -175,8 +172,7 @@ public class TableCache implements PinotConfigProvider {
*/
@Nullable
public Map<String, String> getColumnNameMap(String rawTableName) {
- String schemaName = _schemaNameMap.getOrDefault(rawTableName,
rawTableName);
- SchemaInfo schemaInfo = _schemaInfoMap.getOrDefault(schemaName,
_schemaInfoMap.get(rawTableName));
+ SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
return schemaInfo != null ? schemaInfo._columnNameMap : null;
}
@@ -225,8 +221,7 @@ public class TableCache implements PinotConfigProvider {
@Nullable
@Override
public Schema getSchema(String rawTableName) {
- String schemaName = _schemaNameMap.getOrDefault(rawTableName,
rawTableName);
- SchemaInfo schemaInfo = _schemaInfoMap.get(schemaName);
+ SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
return schemaInfo != null ? schemaInfo._schema : null;
}
@@ -262,17 +257,8 @@ public class TableCache implements PinotConfigProvider {
throws IOException {
TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord);
String tableNameWithType = tableConfig.getTableName();
- _tableConfigInfoMap.put(tableNameWithType, new
TableConfigInfo(tableConfig));
-
- String schemaName = tableConfig.getValidationConfig().getSchemaName();
String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
- if (schemaName != null && !schemaName.equals(rawTableName)) {
- _schemaNameMap.put(tableNameWithType, schemaName);
- _schemaNameMap.put(rawTableName, schemaName);
- } else {
- removeSchemaName(tableNameWithType);
- }
-
+ _tableConfigInfoMap.put(tableNameWithType, new
TableConfigInfo(tableConfig));
if (_ignoreCase) {
_tableNameMap.put(tableNameWithType.toLowerCase(), tableNameWithType);
_tableNameMap.put(rawTableName.toLowerCase(), rawTableName);
@@ -287,7 +273,6 @@ public class TableCache implements PinotConfigProvider {
String tableNameWithType =
path.substring(TABLE_CONFIG_PATH_PREFIX.length());
String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
_tableConfigInfoMap.remove(tableNameWithType);
- removeSchemaName(tableNameWithType);
if (_ignoreCase) {
_tableNameMap.remove(tableNameWithType.toLowerCase());
String lowerCaseRawTableName = rawTableName.toLowerCase();
@@ -314,21 +299,6 @@ public class TableCache implements PinotConfigProvider {
}
}
- private void removeSchemaName(String tableNameWithType) {
- if (_schemaNameMap.remove(tableNameWithType) != null) {
- String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
- if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
- if
(!_schemaNameMap.containsKey(TableNameBuilder.REALTIME.tableNameWithType(rawTableName)))
{
- _schemaNameMap.remove(rawTableName);
- }
- } else {
- if
(!_schemaNameMap.containsKey(TableNameBuilder.OFFLINE.tableNameWithType(rawTableName)))
{
- _schemaNameMap.remove(rawTableName);
- }
- }
- }
- }
-
private void addSchemas(List<String> paths) {
// Subscribe data changes before reading the data to avoid missing changes
for (String path : paths) {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 37ba1499e3..b95e76f19c 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -44,7 +44,6 @@ import org.apache.pinot.spi.config.ConfigUtils;
import org.apache.pinot.spi.config.DatabaseConfig;
import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.user.UserConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -630,60 +629,16 @@ public class ZKMetadataProvider {
*/
@Nullable
public static Schema getTableSchema(ZkHelixPropertyStore<ZNRecord>
propertyStore, String tableName) {
- String rawTableName = TableNameBuilder.extractRawTableName(tableName);
- Schema schema = getSchema(propertyStore, rawTableName);
- if (schema != null) {
- return schema;
- }
-
- // For backward compatible where schema name is not the same as raw table
name
- TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
- // Try to fetch realtime schema first
- if (tableType == null || tableType == TableType.REALTIME) {
- TableConfig realtimeTableConfig = getRealtimeTableConfig(propertyStore,
tableName);
- if (realtimeTableConfig != null) {
- String realtimeSchemaNameFromValidationConfig =
realtimeTableConfig.getValidationConfig().getSchemaName();
- if (realtimeSchemaNameFromValidationConfig != null) {
- schema = getSchema(propertyStore,
realtimeSchemaNameFromValidationConfig);
- }
- }
- }
- // Try to fetch offline schema if realtime schema does not exist
- if (schema == null && (tableType == null || tableType ==
TableType.OFFLINE)) {
- TableConfig offlineTableConfig = getOfflineTableConfig(propertyStore,
tableName);
- if (offlineTableConfig != null) {
- String offlineSchemaNameFromValidationConfig =
offlineTableConfig.getValidationConfig().getSchemaName();
- if (offlineSchemaNameFromValidationConfig != null) {
- schema = getSchema(propertyStore,
offlineSchemaNameFromValidationConfig);
- }
- }
- }
- if (schema != null && LOGGER.isDebugEnabled()) {
- LOGGER.debug("Schema name does not match raw table name, schema name:
{}, raw table name: {}",
- schema.getSchemaName(),
TableNameBuilder.extractRawTableName(tableName));
- }
- return schema;
+ return getSchema(propertyStore,
TableNameBuilder.extractRawTableName(tableName));
}
/**
* Get the schema associated with the given table.
*/
+ @Deprecated
@Nullable
public static Schema getTableSchema(ZkHelixPropertyStore<ZNRecord>
propertyStore, TableConfig tableConfig) {
- String rawTableName =
TableNameBuilder.extractRawTableName(tableConfig.getTableName());
- Schema schema = getSchema(propertyStore, rawTableName);
- if (schema != null) {
- return schema;
- }
- String schemaNameFromTableConfig =
tableConfig.getValidationConfig().getSchemaName();
- if (schemaNameFromTableConfig != null) {
- schema = getSchema(propertyStore, schemaNameFromTableConfig);
- }
- if (schema != null && LOGGER.isDebugEnabled()) {
- LOGGER.debug("Schema name does not match raw table name, schema name:
{}, raw table name: {}",
- schemaNameFromTableConfig, rawTableName);
- }
- return schema;
+ return getTableSchema(propertyStore, tableConfig.getTableName());
}
/**
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index fa27e60f4b..9ea6008b3f 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -151,21 +151,12 @@ public enum ControllerGauge implements
AbstractMetrics.Gauge {
// Consumption availability lag in ms at a partition level
MAX_RECORD_AVAILABILITY_LAG_MS("maxRecordAvailabilityLagMs", false),
- // Number of table schema got misconfigured
- MISCONFIGURED_SCHEMA_TABLE_COUNT("misconfiguredSchemaTableCount", true),
+ // Number of table without table config
+ TABLE_WITHOUT_TABLE_CONFIG_COUNT("tableWithoutTableConfigCount", true),
- // Number of table without schema
+ // Number of table with table config but without schema
TABLE_WITHOUT_SCHEMA_COUNT("tableWithoutSchemaCount", true),
- // Number of table schema got fixed
- FIXED_SCHEMA_TABLE_COUNT("fixedSchemaTableCount", true),
-
- // Number of tables that we want to fix but failed to copy schema from old
schema name to new schema name
- FAILED_TO_COPY_SCHEMA_COUNT("failedToCopySchemaCount", true),
-
- // Number of tables that we want to fix but failed to update table config
- FAILED_TO_UPDATE_TABLE_CONFIG_COUNT("failedToUpdateTableConfigCount", true),
-
LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_QUEUE_SIZE("LLCSegmentDeepStoreUploadRetryQueueSize",
false),
TABLE_CONSUMPTION_PAUSED("tableConsumptionPaused", false),
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/SchemaUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/SchemaUtils.java
index 4cadf09bea..2800441fa6 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SchemaUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SchemaUtils.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.io.IOUtils;
import org.apache.hc.client5.http.classic.methods.HttpDelete;
@@ -58,7 +57,7 @@ public class SchemaUtils {
/**
* Fetch {@link Schema} from a {@link ZNRecord}.
*/
- public static Schema fromZNRecord(@Nonnull ZNRecord record)
+ public static Schema fromZNRecord(ZNRecord record)
throws IOException {
String schemaJSON = record.getSimpleField("schemaJSON");
return Schema.fromString(schemaJSON);
@@ -67,7 +66,7 @@ public class SchemaUtils {
/**
* Wrap {@link Schema} into a {@link ZNRecord}.
*/
- public static ZNRecord toZNRecord(@Nonnull Schema schema) {
+ public static ZNRecord toZNRecord(Schema schema) {
ZNRecord record = new ZNRecord(schema.getSchemaName());
record.setSimpleField("schemaJSON", schema.toSingleLineJsonString());
return record;
@@ -79,7 +78,8 @@ public class SchemaUtils {
* @return schema on success.
* <P><code>null</code> on failure.
*/
- public static @Nullable Schema getSchema(@Nonnull String host, int port,
@Nonnull String schemaName) {
+ @Nullable
+ public static Schema getSchema(String host, int port, String schemaName) {
Preconditions.checkNotNull(host);
Preconditions.checkNotNull(schemaName);
@@ -112,7 +112,7 @@ public class SchemaUtils {
* @return <code>true</code> on success.
* <P><code>false</code> on failure.
*/
- public static boolean postSchema(@Nonnull String host, int port, @Nonnull
Schema schema) {
+ public static boolean postSchema(String host, int port, Schema schema) {
Preconditions.checkNotNull(host);
Preconditions.checkNotNull(schema);
@@ -144,7 +144,7 @@ public class SchemaUtils {
* @return <code>true</code> on success.
* <P><code>false</code> on failure.
*/
- public static boolean deleteSchema(@Nonnull String host, int port, @Nonnull
String schemaName) {
+ public static boolean deleteSchema(String host, int port, String schemaName)
{
Preconditions.checkNotNull(host);
Preconditions.checkNotNull(schemaName);
@@ -172,7 +172,7 @@ public class SchemaUtils {
* @return <code>true</code> if two schemas equal to each other.
* <p><code>false</code>if two schemas do not equal to each other.
*/
- public static boolean equalsIgnoreVersion(@Nonnull Schema schema1, @Nonnull
Schema schema2) {
+ public static boolean equalsIgnoreVersion(Schema schema1, Schema schema2) {
Preconditions.checkNotNull(schema1);
Preconditions.checkNotNull(schema2);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 94c380cbfe..2e4954076c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -37,15 +37,12 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.hc.core5.http.io.SocketConfig;
import org.apache.hc.core5.util.Timeout;
-import org.apache.helix.AccessOption;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
@@ -73,7 +70,6 @@ import
org.apache.pinot.common.minion.InMemoryTaskManagerStatusCache;
import org.apache.pinot.common.minion.TaskGeneratorMostRecentRunInfo;
import org.apache.pinot.common.minion.TaskManagerStatusCache;
import org.apache.pinot.common.utils.PinotAppConfigs;
-import org.apache.pinot.common.utils.SchemaUtils;
import org.apache.pinot.common.utils.ServiceStartableUtils;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
@@ -141,7 +137,6 @@ import org.apache.pinot.spi.services.ServiceStartable;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.spi.utils.NetUtils;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.slf4j.Logger;
@@ -593,9 +588,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
LOGGER.info("Starting controller admin application on: {}",
ListenerConfigUtil.toString(_listenerConfigs));
_adminApp.start(_listenerConfigs);
- // One time job to fix schema name in all tables
- // This method can be removed after the next major release.
- fixSchemaNameInTableConfig();
+ enforceTableConfigAndSchema();
_controllerMetrics.addCallbackGauge("dataDir.exists", () -> new
File(_config.getDataDir()).exists() ? 1L : 0L);
_controllerMetrics.addCallbackGauge("dataDir.fileOpLatencyMs", () -> {
@@ -626,91 +619,46 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
}
/**
- * This method is used to fix table/schema names.
- * TODO: in the next release, maybe 2.0.0, we can remove this method.
Meanwhile we can delete the orphan schemas
- * that has been existed longer than a certain time period.
- *
+ * Scan all table resources in the cluster and ensure table config and
schema exist for each table.
+ * TODO: Cleanup orphan table config and schema
*/
- @VisibleForTesting
- public void fixSchemaNameInTableConfig() {
- AtomicInteger misconfiguredTableCount = new AtomicInteger();
- AtomicInteger tableWithoutSchemaCount = new AtomicInteger();
- AtomicInteger fixedSchemaTableCount = new AtomicInteger();
- AtomicInteger failedToCopySchemaCount = new AtomicInteger();
- AtomicInteger failedToUpdateTableConfigCount = new AtomicInteger();
+ private void enforceTableConfigAndSchema() {
ZkHelixPropertyStore<ZNRecord> propertyStore =
_helixResourceManager.getPropertyStore();
-
- _helixResourceManager.getAllTables().forEach(tableNameWithType -> {
- Pair<TableConfig, Integer> tableConfigWithVersion =
- ZKMetadataProvider.getTableConfigWithVersion(propertyStore,
tableNameWithType);
- if (tableConfigWithVersion == null) {
- // This might due to table deletion, just log it here.
- LOGGER.warn("Failed to find table config for table: {}, the table
likely already got deleted",
- tableNameWithType);
- return;
+ List<String> tablesWithoutTableConfig = new ArrayList<>();
+ List<String> tablesWithoutSchema = new ArrayList<>();
+ for (String tableNameWithType : _helixResourceManager.getAllTables()) {
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(propertyStore, tableNameWithType);
+ if (tableConfig == null) {
+ tablesWithoutTableConfig.add(tableNameWithType);
+ continue;
}
- TableConfig tableConfig = tableConfigWithVersion.getLeft();
- String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
- String schemaPath =
ZKMetadataProvider.constructPropertyStorePathForSchema(rawTableName);
- boolean schemaExists = propertyStore.exists(schemaPath,
AccessOption.PERSISTENT);
- String existSchemaName =
tableConfig.getValidationConfig().getSchemaName();
- if (existSchemaName == null || existSchemaName.equals(rawTableName)) {
- // Although the table config is valid, we still need to ensure the
schema exists
- if (!schemaExists) {
- LOGGER.warn("Failed to find schema for table: {}",
tableNameWithType);
- tableWithoutSchemaCount.getAndIncrement();
- return;
- }
- // Table config is already in good status
- return;
+ Schema schema = ZKMetadataProvider.getTableSchema(propertyStore,
tableNameWithType);
+ if (schema == null) {
+ tablesWithoutSchema.add(tableNameWithType);
}
- misconfiguredTableCount.getAndIncrement();
- if (schemaExists) {
- // If a schema named `rawTableName` already exists, then likely this
is a misconfiguration.
- // Reset schema name in table config to null to let the table point to
the existing schema.
- LOGGER.warn("Schema: {} already exists, fix the schema name in table
config from {} to null", rawTableName,
- existSchemaName);
+ }
+ if (!tablesWithoutTableConfig.isEmpty()) {
+ LOGGER.error("[CRITICAL!!!] Failed to find table config for tables: {}",
tablesWithoutTableConfig);
+ if (_config.isExitOnTableConfigCheckFailure()) {
+ throw new IllegalStateException("Failed to find table config for
tables: " + tablesWithoutTableConfig
+ + ", exiting! Please set
controller.startup.exitOnTableConfigCheckFailure=false to not exit and fix
these "
+ + "tables.");
} else {
- // Copy the schema current table referring to to `rawTableName` if it
does not exist
- Schema schema = _helixResourceManager.getSchema(existSchemaName);
- if (schema == null) {
- LOGGER.warn("Failed to find schema: {} for table: {}",
existSchemaName, tableNameWithType);
- tableWithoutSchemaCount.getAndIncrement();
- return;
- }
- schema.setSchemaName(rawTableName);
- if (propertyStore.create(schemaPath, SchemaUtils.toZNRecord(schema),
AccessOption.PERSISTENT)) {
- LOGGER.info("Copied schema: {} to {}", existSchemaName,
rawTableName);
- } else {
- LOGGER.warn("Failed to copy schema: {} to {}", existSchemaName,
rawTableName);
- failedToCopySchemaCount.getAndIncrement();
- return;
- }
+
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.TABLE_WITHOUT_TABLE_CONFIG_COUNT,
+ tablesWithoutTableConfig.size());
}
- // Update table config to remove schema name
- tableConfig.getValidationConfig().setSchemaName(null);
- if (ZKMetadataProvider.setTableConfig(propertyStore, tableConfig,
tableConfigWithVersion.getRight())) {
- LOGGER.info("Removed schema name from table config for table: {}",
tableNameWithType);
- fixedSchemaTableCount.getAndIncrement();
+ }
+ if (!tablesWithoutSchema.isEmpty()) {
+ LOGGER.error("[CRITICAL!!!] Failed to find schema for tables: {}",
tablesWithoutSchema);
+ if (_config.isExitOnSchemaCheckFailure()) {
+ throw new IllegalStateException("Failed to find schema for tables: " +
tablesWithoutSchema
+ + ", exiting! Please set
controller.startup.exitOnSchemaCheckFailure=false to not exit and fix these "
+ + "tables.");
} else {
- LOGGER.warn("Failed to update table config for table: {}",
tableNameWithType);
- failedToUpdateTableConfigCount.getAndIncrement();
+
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT,
+ tablesWithoutSchema.size());
}
- });
- LOGGER.info(
- "Found {} tables misconfigured, {} tables without schema. Successfully
fixed schema for {} tables, failed to "
- + "fix {} tables due to copy schema failure, failed to fix {}
tables due to update table config failure.",
- misconfiguredTableCount.get(), tableWithoutSchemaCount.get(),
fixedSchemaTableCount.get(),
- failedToCopySchemaCount.get(), failedToUpdateTableConfigCount.get());
-
-
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT,
- misconfiguredTableCount.get());
-
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT,
tableWithoutSchemaCount.get());
-
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FIXED_SCHEMA_TABLE_COUNT,
fixedSchemaTableCount.get());
-
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT,
- failedToCopySchemaCount.get());
-
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT,
- failedToUpdateTableConfigCount.get());
+ }
}
private ServiceStatus.ServiceStatusCallback
generateServiceStatusCallback(HelixManager helixManager) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index c76943773b..29ebb084f0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -362,6 +362,11 @@ public class ControllerConf extends PinotConfiguration {
public static final String ENFORCE_POOL_BASED_ASSIGNMENT_KEY =
"enforce.pool.based.assignment";
public static final boolean DEFAULT_ENFORCE_POOL_BASED_ASSIGNMENT = false;
+ public static final String EXIT_ON_TABLE_CONFIG_CHECK_FAILURE =
"controller.startup.exitOnTableConfigCheckFailure";
+ public static final boolean DEFAULT_EXIT_ON_TABLE_CONFIG_CHECK_FAILURE =
true;
+ public static final String EXIT_ON_SCHEMA_CHECK_FAILURE =
"controller.startup.exitOnSchemaCheckFailure";
+ public static final boolean DEFAULT_EXIT_ON_SCHEMA_CHECK_FAILURE = true;
+
public ControllerConf() {
super(new HashMap<>());
}
@@ -1205,12 +1210,20 @@ public class ControllerConf extends PinotConfiguration {
return getProperty(ENFORCE_POOL_BASED_ASSIGNMENT_KEY,
DEFAULT_ENFORCE_POOL_BASED_ASSIGNMENT);
}
+ public boolean isExitOnTableConfigCheckFailure() {
+ return getProperty(EXIT_ON_TABLE_CONFIG_CHECK_FAILURE,
DEFAULT_EXIT_ON_TABLE_CONFIG_CHECK_FAILURE);
+ }
+
+ public boolean isExitOnSchemaCheckFailure() {
+ return getProperty(EXIT_ON_SCHEMA_CHECK_FAILURE,
DEFAULT_EXIT_ON_SCHEMA_CHECK_FAILURE);
+ }
+
public void setEnableSwagger(boolean value) {
- setProperty(ControllerConf.CONSOLE_SWAGGER_ENABLE, value);
+ setProperty(CONSOLE_SWAGGER_ENABLE, value);
}
public boolean isEnableSwagger() {
- String enableSwagger = getProperty(ControllerConf.CONSOLE_SWAGGER_ENABLE);
+ String enableSwagger = getProperty(CONSOLE_SWAGGER_ENABLE);
return enableSwagger == null || Boolean.parseBoolean(enableSwagger);
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java
index d0a087c1d8..2e1d0d2c88 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java
@@ -206,15 +206,18 @@ public class PinotSchemaRestletResource {
})
public ConfigSuccessResponse updateSchema(
@ApiParam(value = "Name of the schema", required = true)
@PathParam("schemaName") String schemaName,
- @ApiParam(value = "Whether to reload the table if the new schema is
backward compatible") @DefaultValue("false")
- @QueryParam("reload") boolean reload, @Context HttpHeaders headers,
FormDataMultiPart multiPart) {
+ @ApiParam(value = "Whether to reload the table after updating the
schema") @DefaultValue("false")
+ @QueryParam("reload") boolean reload,
+ @ApiParam(value = "Whether to force update the schema even if the new
schema is backward incompatible")
+ @DefaultValue("false") @QueryParam("force") boolean force, @Context
HttpHeaders headers,
+ FormDataMultiPart multiPart) {
schemaName = DatabaseUtils.translateTableName(schemaName, headers);
Pair<Schema, Map<String, Object>> schemaAndUnrecognizedProps =
getSchemaAndUnrecognizedPropertiesFromMultiPart(multiPart);
Schema schema = schemaAndUnrecognizedProps.getLeft();
validateSchemaName(schema);
schema.setSchemaName(DatabaseUtils.translateTableName(schema.getSchemaName(),
headers));
- SuccessResponse successResponse = updateSchema(schemaName, schema, reload);
+ SuccessResponse successResponse = updateSchema(schemaName, schema, reload,
force);
return new ConfigSuccessResponse(successResponse.getStatus(),
schemaAndUnrecognizedProps.getRight());
}
@@ -233,15 +236,18 @@ public class PinotSchemaRestletResource {
})
public ConfigSuccessResponse updateSchema(
@ApiParam(value = "Name of the schema", required = true)
@PathParam("schemaName") String schemaName,
- @ApiParam(value = "Whether to reload the table if the new schema is
backward compatible") @DefaultValue("false")
- @QueryParam("reload") boolean reload, @Context HttpHeaders headers,
String schemaJsonString) {
+ @ApiParam(value = "Whether to reload the table after updating the
schema") @DefaultValue("false")
+ @QueryParam("reload") boolean reload,
+ @ApiParam(value = "Whether to force update the schema even if the new
schema is backward incompatible")
+ @DefaultValue("false") @QueryParam("force") boolean force, @Context
HttpHeaders headers,
+ String schemaJsonString) {
schemaName = DatabaseUtils.translateTableName(schemaName, headers);
Pair<Schema, Map<String, Object>> schemaAndUnrecognizedProps =
getSchemaAndUnrecognizedPropertiesFromJson(schemaJsonString);
Schema schema = schemaAndUnrecognizedProps.getLeft();
validateSchemaName(schema);
schema.setSchemaName(DatabaseUtils.translateTableName(schema.getSchemaName(),
headers));
- SuccessResponse successResponse = updateSchema(schemaName, schema, reload);
+ SuccessResponse successResponse = updateSchema(schemaName, schema, reload,
force);
return new ConfigSuccessResponse(successResponse.getStatus(),
schemaAndUnrecognizedProps.getRight());
}
@@ -446,7 +452,7 @@ public class PinotSchemaRestletResource {
* @param reload set to true to reload the tables using the schema, so
committed segments can pick up the new schema
* @return SuccessResponse
*/
- private SuccessResponse updateSchema(String schemaName, Schema schema,
boolean reload) {
+ private SuccessResponse updateSchema(String schemaName, Schema schema,
boolean reload, boolean force) {
validateSchemaInternal(schema);
if (!schemaName.equals(schema.getSchemaName())) {
@@ -457,7 +463,7 @@ public class PinotSchemaRestletResource {
}
try {
- _pinotHelixResourceManager.updateSchema(schema, reload, false);
+ _pinotHelixResourceManager.updateSchema(schema, reload, force);
// Best effort notification. If controller fails at this point, no
notification is given.
LOGGER.info("Notifying metadata event for updating schema: {}",
schemaName);
_metadataEventNotifierFactory.create().notifyOnSchemaEvents(schema,
SchemaEventType.UPDATE);
@@ -520,7 +526,6 @@ public class PinotSchemaRestletResource {
*/
private Pair<Schema, Map<String, Object>>
getSchemaAndUnrecognizedPropertiesFromJson(String schemaJsonString)
throws ControllerApplicationException {
- Pair<Schema, Map<String, Object>> schemaAndUnrecognizedProps;
try {
return
JsonUtils.stringToObjectAndUnrecognizedProperties(schemaJsonString,
Schema.class);
} catch (Exception e) {
@@ -537,24 +542,14 @@ public class PinotSchemaRestletResource {
}
// If the schema is associated with a table, we should not delete it.
- // TODO: Check OFFLINE tables as well. There are 2 side effects:
- // - Increases ZK read when there are lots of OFFLINE tables
- // - Behavior change since we don't allow deleting schema for
OFFLINE tables
- List<String> realtimeTables =
_pinotHelixResourceManager.getAllRealtimeTables();
- for (String realtimeTableName : realtimeTables) {
- if
(schemaName.equals(TableNameBuilder.extractRawTableName(realtimeTableName))) {
+ String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(schemaName);
+ String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(schemaName);
+ for (String tableNameWithType : new String[]{offlineTableName,
realtimeTableName}) {
+ if (_pinotHelixResourceManager.hasTable(tableNameWithType)) {
throw new ControllerApplicationException(LOGGER,
- String.format("Cannot delete schema %s, as it is associated with
table %s", schemaName, realtimeTableName),
+ String.format("Cannot delete schema %s, as it is associated with
table %s", schemaName, tableNameWithType),
Response.Status.CONFLICT);
}
- TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(realtimeTableName);
- if (tableConfig != null) {
- if
(schemaName.equals(tableConfig.getValidationConfig().getSchemaName())) {
- throw new ControllerApplicationException(LOGGER,
- String.format("Cannot delete schema %s, as it is associated with
table %s", schemaName,
- realtimeTableName), Response.Status.CONFLICT);
- }
- }
}
LOGGER.info("Trying to delete schema {}", schemaName);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index fed660e988..eb604b55b5 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -112,7 +112,6 @@ import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.ManualAuthorization;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.segment.local.utils.TableConfigUtils;
-import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableStatsHumanReadable;
import org.apache.pinot.spi.config.table.TableStatus;
@@ -194,9 +193,8 @@ public class PinotTableRestletResource {
HttpClientConnectionManager _connectionManager;
/**
- * API to create a table. Before adding, validations will be done (min
number of replicas,
- * checking offline and realtime table configs match, checking for tenants
existing)
- * @param tableConfigStr
+ * API to create a table. Before adding, validations will be done (min
number of replicas, checking offline and
+ * realtime table configs match, checking for tenants existing).
*/
@POST
@Produces(MediaType.APPLICATION_JSON)
@@ -218,14 +216,13 @@ public class PinotTableRestletResource {
tableConfig = tableConfigAndUnrecognizedProperties.getLeft();
tableNameWithType =
DatabaseUtils.translateTableName(tableConfig.getTableName(), httpHeaders);
tableConfig.setTableName(tableNameWithType);
- // Handle legacy config
- handleLegacySchemaConfig(tableConfig, httpHeaders);
// validate permission
ResourceUtils.checkPermissionAndAccess(tableNameWithType, request,
httpHeaders,
AccessType.CREATE, Actions.Table.CREATE_TABLE,
_accessControlFactory, LOGGER);
- schema = _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
+ schema = _pinotHelixResourceManager.getTableSchema(tableNameWithType);
+ Preconditions.checkState(schema != null, "Failed to find schema for
table: %s", tableNameWithType);
TableConfigTunerUtils.applyTunerConfigs(_pinotHelixResourceManager,
tableConfig, schema, Collections.emptyMap());
@@ -490,8 +487,6 @@ public class PinotTableRestletResource {
tableConfig = tableConfigAndUnrecognizedProperties.getLeft();
tableNameWithType =
DatabaseUtils.translateTableName(tableConfig.getTableName(), headers);
tableConfig.setTableName(tableNameWithType);
- // Handle legacy config
- handleLegacySchemaConfig(tableConfig, headers);
String tableNameFromPath = DatabaseUtils.translateTableName(
TableNameBuilder.forType(tableConfig.getTableType()).tableNameWithType(tableName),
headers);
if (!tableNameFromPath.equals(tableNameWithType)) {
@@ -500,7 +495,8 @@ public class PinotTableRestletResource {
Response.Status.BAD_REQUEST);
}
- schema = _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
+ schema = _pinotHelixResourceManager.getTableSchema(tableNameWithType);
+ Preconditions.checkState(schema != null, "Failed to find schema for
table: %s", tableNameWithType);
TableConfigUtils.validate(tableConfig, schema, typesToSkip);
} catch (Exception e) {
String msg = String.format("Invalid table config: %s with error: %s",
tableName, e.getMessage());
@@ -559,24 +555,22 @@ public class PinotTableRestletResource {
String tableNameWithType =
DatabaseUtils.translateTableName(tableConfig.getTableName(), httpHeaders);
tableConfig.setTableName(tableNameWithType);
- // Handle legacy config
- handleLegacySchemaConfig(tableConfig, httpHeaders);
-
// validate permission
ResourceUtils.checkPermissionAndAccess(tableNameWithType, request,
httpHeaders,
AccessType.READ, Actions.Table.VALIDATE_TABLE_CONFIGS,
_accessControlFactory, LOGGER);
- ObjectNode validationResponse =
- validateConfig(tableConfig,
_pinotHelixResourceManager.getSchemaForTableConfig(tableConfig), typesToSkip);
+ ObjectNode validationResponse = validateConfig(tableConfig, typesToSkip);
validationResponse.set("unrecognizedProperties",
JsonUtils.objectToJsonNode(tableConfigAndUnrecognizedProperties.getRight()));
return validationResponse;
}
- private ObjectNode validateConfig(TableConfig tableConfig, Schema schema,
@Nullable String typesToSkip) {
+ private ObjectNode validateConfig(TableConfig tableConfig, @Nullable String
typesToSkip) {
+ String tableNameWithType = tableConfig.getTableName();
try {
+ Schema schema =
_pinotHelixResourceManager.getTableSchema(tableNameWithType);
if (schema == null) {
- throw new SchemaNotFoundException("Got empty schema");
+ throw new SchemaNotFoundException("Failed to find schema for table: "
+ tableNameWithType);
}
TableConfigUtils.validate(tableConfig, schema, typesToSkip);
TaskConfigUtils.validateTaskConfigs(tableConfig, schema,
_pinotTaskManager, typesToSkip);
@@ -588,7 +582,7 @@ public class PinotTableRestletResource {
}
return tableConfigValidateStr;
} catch (Exception e) {
- String msg = String.format("Invalid table config: %s. %s",
tableConfig.getTableName(), e.getMessage());
+ String msg = String.format("Invalid table config: %s. %s",
tableNameWithType, e.getMessage());
throw new ControllerApplicationException(LOGGER, msg,
Response.Status.BAD_REQUEST, e);
}
}
@@ -1279,24 +1273,6 @@ public class PinotTableRestletResource {
return timeBoundaryMs;
}
- /**
- * Handles the legacy schema configuration for a given table configuration.
- * This method updates the schema name in the validation configuration of
the table config
- * to ensure it is correctly translated based on the provided HTTP headers.
- * This is necessary to maintain compatibility with older configurations
that may not
- * have the schema name properly set or formatted.
- *
- * @param tableConfig The {@link TableConfig} object containing the table
configuration.
- * @param httpHeaders The {@link HttpHeaders} object containing the HTTP
headers, used to
- * translate the schema name if necessary.
- */
- private void handleLegacySchemaConfig(TableConfig tableConfig, HttpHeaders
httpHeaders) {
- SegmentsValidationAndRetentionConfig validationConfig =
tableConfig.getValidationConfig();
- if (validationConfig.getSchemaName() != null) {
-
validationConfig.setSchemaName(DatabaseUtils.translateTableName(validationConfig.getSchemaName(),
httpHeaders));
- }
- }
-
/**
* Try to calculate the instance partitions for the given table config.
Throws exception if it fails.
*/
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableAndSchemaConfig.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableAndSchemaConfig.java
index a4a76d1527..a2f262b6fb 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableAndSchemaConfig.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableAndSchemaConfig.java
@@ -21,7 +21,6 @@ package org.apache.pinot.controller.api.resources;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
-import javax.annotation.Nullable;
import org.apache.pinot.spi.config.TableConfigs;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
@@ -39,7 +38,7 @@ public class TableAndSchemaConfig {
@JsonCreator
public TableAndSchemaConfig(@JsonProperty(value = "tableConfig", required =
true) TableConfig tableConfig,
- @JsonProperty("schema") @Nullable Schema schema) {
+ @JsonProperty(value = "schema", required = true) Schema schema) {
_tableConfig = tableConfig;
_schema = schema;
}
@@ -48,7 +47,6 @@ public class TableAndSchemaConfig {
return _tableConfig;
}
- @Nullable
public Schema getSchema() {
return _schema;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
index 754367fef5..4a8c54993e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
@@ -105,6 +105,17 @@ public class ControllerRequestClient {
}
}
+ public void forceUpdateSchema(Schema schema)
+ throws IOException {
+ String url =
_controllerRequestURLBuilder.forSchemaUpdate(schema.getSchemaName()) +
"?force=true";
+ try {
+ HttpClient.wrapAndThrowHttpException(
+ _httpClient.sendMultipartPutRequest(url,
schema.toSingleLineJsonString(), _headers));
+ } catch (HttpErrorStatusException e) {
+ throw new IOException(e);
+ }
+ }
+
public void deleteSchema(String schemaName)
throws IOException {
String url = _controllerRequestURLBuilder.forSchemaDelete(schemaName);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index f125854976..c8478c25c5 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1630,21 +1630,10 @@ public class PinotHelixResourceManager {
return ZKMetadataProvider.getTableSchema(_propertyStore, tableName);
}
- /**
- * Find schema with same name as rawTableName. If not found, find schema
using schemaName in validationConfig.
- * For OFFLINE table, it is possible that schema was not uploaded before
creating the table. Hence for OFFLINE,
- * this method can return null.
- */
+ @Deprecated
@Nullable
public Schema getSchemaForTableConfig(TableConfig tableConfig) {
- Schema schema =
getSchema(TableNameBuilder.extractRawTableName(tableConfig.getTableName()));
- if (schema == null) {
- String schemaName = tableConfig.getValidationConfig().getSchemaName();
- if (schemaName != null) {
- schema = getSchema(schemaName);
- }
- }
- return schema;
+ return ZKMetadataProvider.getTableSchema(_propertyStore, tableConfig);
}
public List<String> getSchemaNames() {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 7a00a4d8e4..293f649fa9 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -664,6 +664,11 @@ public class ControllerTest {
getControllerRequestClient().updateSchema(schema);
}
+ public void forceUpdateSchema(Schema schema)
+ throws IOException {
+ getControllerRequestClient().forceUpdateSchema(schema);
+ }
+
public Schema getSchema(String schemaName) {
Schema schema = _helixResourceManager.getSchema(schemaName);
assertNotNull(schema);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/SchemaCleanupTaskStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/SchemaCleanupTaskStatelessTest.java
deleted file mode 100644
index 19c4756634..0000000000
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/SchemaCleanupTaskStatelessTest.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core.cleanup;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.pinot.common.metrics.ControllerGauge;
-import org.apache.pinot.common.metrics.MetricValueUtils;
-import org.apache.pinot.common.utils.config.TagNameUtils;
-import org.apache.pinot.controller.BaseControllerStarter;
-import org.apache.pinot.controller.helix.ControllerTest;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.metrics.PinotMetricUtils;
-import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.NetUtils;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
-
-
-/**
- * This test can be deleted once {@link
BaseControllerStarter#fixSchemaNameInTableConfig()} is deleted. Likely in 2.0.0.
- */
-@Test(groups = "stateless")
-public class SchemaCleanupTaskStatelessTest extends ControllerTest {
- @BeforeClass
- public void setup()
- throws Exception {
- startZk();
- startController();
- startFakeBroker();
- startFakeServer();
- }
-
- private void startFakeBroker()
- throws Exception {
- String brokerInstance = CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE +
NetUtils.getHostAddress() + "_"
- + CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT;
-
- // Create server instance with the fake server state model
- HelixManager brokerHelixManager =
- HelixManagerFactory.getZKHelixManager(getHelixClusterName(),
brokerInstance, InstanceType.PARTICIPANT,
- getZkUrl());
- brokerHelixManager.connect();
-
- // Add Helix tag to the server
-
brokerHelixManager.getClusterManagmentTool().addInstanceTag(getHelixClusterName(),
brokerInstance,
- TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME));
- }
-
- private void startFakeServer()
- throws Exception {
- String serverInstance = CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE +
NetUtils.getHostAddress() + "_"
- + CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT;
-
- // Create server instance with the fake server state model
- HelixManager serverHelixManager = HelixManagerFactory
- .getZKHelixManager(getHelixClusterName(), serverInstance,
InstanceType.PARTICIPANT, getZkUrl());
- serverHelixManager.connect();
-
- // Add Helix tag to the server
-
serverHelixManager.getClusterManagmentTool().addInstanceTag(getHelixClusterName(),
serverInstance,
-
TableNameBuilder.OFFLINE.tableNameWithType(TagNameUtils.DEFAULT_TENANT_NAME));
- }
-
- @AfterClass
- public void teardown() {
- stopController();
- stopZk();
- }
-
- @Test
- public void testSchemaCleanupTask()
- throws Exception {
- PinotMetricUtils.cleanUp();
- PinotMetricUtils.getPinotMetricsRegistry();
- // 1. Add a schema
- addSchema(createDummySchema("t1"));
- addSchema(createDummySchema("t2"));
- addSchema(createDummySchema("t3"));
-
- // 2. Add a table with the schema name reference
- addTableConfig(createDummyTableConfig("t1", "t1"));
- addTableConfig(createDummyTableConfig("t2", "t2"));
- addTableConfig(createDummyTableConfig("t3", "t3"));
-
- _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t1",
"t2"));
- _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t2",
"t3"));
- _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t3",
"t1"));
-
- // 3. Fix table schema
- _controllerStarter.fixSchemaNameInTableConfig();
-
- // 4. validate
- assertEquals(getHelixResourceManager().getAllTables().size(), 3);
- assertEquals(getHelixResourceManager().getSchemaNames().size(), 3);
-
-
assertNull(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName());
-
assertNull(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName());
-
assertNull(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName());
-
-
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
- ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT), 3);
-
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
- ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT), 0);
-
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
- ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 3);
-
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
- ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT), 0);
-
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
- ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT), 0);
-
- // 5. Clean up
- for (String table : getHelixResourceManager().getAllOfflineTables()) {
- getHelixResourceManager().deleteOfflineTable(table);
- }
- for (String schema : getHelixResourceManager().getSchemaNames()) {
- getHelixResourceManager().deleteSchema(schema);
- }
- }
-
- @Test
- public void testSchemaCleanupTaskNormalCase()
- throws Exception {
- PinotMetricUtils.cleanUp();
- PinotMetricUtils.getPinotMetricsRegistry();
- // 1. Add a schema
- addSchema(createDummySchema("t1"));
- addSchema(createDummySchema("t2"));
- addSchema(createDummySchema("t3"));
-
- assertEquals(getHelixResourceManager().getSchemaNames().size(), 3);
-
- // 2. Add a table with the schema name reference
- addTableConfig(createDummyTableConfig("t1", "t1"));
- addTableConfig(createDummyTableConfig("t2", "t2"));
- addTableConfig(createDummyTableConfig("t3", "t3"));
-
- assertEquals(getHelixResourceManager().getAllTables().size(), 3);
-
- // 3. Create new schemas and update table to new schema
- addSchema(createDummySchema("t11"));
- addSchema(createDummySchema("t21"));
- addSchema(createDummySchema("t31"));
- _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t1",
"t11"));
- _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t2",
"t21"));
- _helixResourceManager.setExistingTableConfig(createDummyTableConfig("t3",
"t31"));
-
- assertEquals(getHelixResourceManager().getAllTables().size(), 3);
- assertEquals(getHelixResourceManager().getSchemaNames().size(), 6);
-
assertEquals(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName(),
"t11");
-
assertEquals(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName(),
"t21");
-
assertEquals(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName(),
"t31");
-
- // 4. Delete schema t1, t2, t3, so we can check if those schemas are fixed
later.
- deleteSchema("t1");
- deleteSchema("t2");
- deleteSchema("t3");
-
- assertEquals(getHelixResourceManager().getSchemaNames().size(), 3);
-
- // 5. Fix table schema
- _controllerStarter.fixSchemaNameInTableConfig();
-
- // 6. All tables will directly set schema.
- assertEquals(getHelixResourceManager().getAllTables().size(), 3);
- assertEquals(getHelixResourceManager().getSchemaNames().size(), 6);
- assertTrue(getHelixResourceManager().getSchemaNames().contains("t1"));
- assertTrue(getHelixResourceManager().getSchemaNames().contains("t2"));
- assertTrue(getHelixResourceManager().getSchemaNames().contains("t3"));
-
-
assertNull(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName());
-
assertNull(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName());
-
assertNull(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName());
-
-
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
- ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT), 3);
-
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
- ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT), 0);
-
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
- ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 3);
-
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
- ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT), 0);
-
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
- ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT), 0);
-
- // 7. Clean up
- for (String table : getHelixResourceManager().getAllOfflineTables()) {
- getHelixResourceManager().deleteOfflineTable(table);
- }
- for (String schema : getHelixResourceManager().getSchemaNames()) {
- getHelixResourceManager().deleteSchema(schema);
- }
- }
-
- @Test
- public void testMissingSchema()
- throws Exception {
- PinotMetricUtils.cleanUp();
- PinotMetricUtils.getPinotMetricsRegistry();
- // 1. Add a schema
- addSchema(createDummySchema("t1"));
- addSchema(createDummySchema("t2"));
- addSchema(createDummySchema("t3"));
-
- assertEquals(getHelixResourceManager().getSchemaNames().size(), 3);
-
- // 2. Add a table with the schema name reference
- addTableConfig(createDummyTableConfig("t1"));
- addTableConfig(createDummyTableConfig("t2"));
- addTableConfig(createDummyTableConfig("t3"));
-
- assertEquals(getHelixResourceManager().getAllTables().size(), 3);
-
- // 4. Delete schema t1, t2, t3, so we can check if those schemas are fixed
later.
- deleteSchema("t1");
- deleteSchema("t2");
- deleteSchema("t3");
-
- assertEquals(getHelixResourceManager().getSchemaNames().size(), 0);
-
- // 5. Fix table schema
- _controllerStarter.fixSchemaNameInTableConfig();
-
- // 6. We cannot fix schema
- assertEquals(getHelixResourceManager().getAllTables().size(), 3);
- assertEquals(getHelixResourceManager().getSchemaNames().size(), 0);
-
-
assertNull(getHelixResourceManager().getTableConfig("t1_OFFLINE").getValidationConfig().getSchemaName());
-
assertNull(getHelixResourceManager().getTableConfig("t2_OFFLINE").getValidationConfig().getSchemaName());
-
assertNull(getHelixResourceManager().getTableConfig("t3_OFFLINE").getValidationConfig().getSchemaName());
-
-
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
- ControllerGauge.MISCONFIGURED_SCHEMA_TABLE_COUNT), 0);
-
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
- ControllerGauge.TABLE_WITHOUT_SCHEMA_COUNT), 3);
-
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
- ControllerGauge.FIXED_SCHEMA_TABLE_COUNT), 0);
-
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
- ControllerGauge.FAILED_TO_COPY_SCHEMA_COUNT), 0);
-
assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(),
- ControllerGauge.FAILED_TO_UPDATE_TABLE_CONFIG_COUNT), 0);
-
- // 7. Clean up
- for (String table : getHelixResourceManager().getAllOfflineTables()) {
- getHelixResourceManager().deleteOfflineTable(table);
- }
- for (String schema : getHelixResourceManager().getSchemaNames()) {
- getHelixResourceManager().deleteSchema(schema);
- }
- }
-
- private TableConfig createDummyTableConfig(String table) {
- return new TableConfigBuilder(TableType.OFFLINE)
- .setTableName(table)
- .build();
- }
-
- private TableConfig createDummyTableConfig(String table, String schema) {
- TableConfig tableConfig = createDummyTableConfig(table);
- tableConfig.getValidationConfig().setSchemaName(schema);
- return tableConfig;
- }
-}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index fa985f2b06..13edaf56cf 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -96,7 +96,6 @@ import
org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -358,16 +357,13 @@ public abstract class BaseTableDataManager implements
TableDataManager {
public Pair<TableConfig, Schema> fetchTableConfigAndSchema() {
TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
Preconditions.checkState(tableConfig != null, "Failed to find table config
for table: %s", _tableNameWithType);
- Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
tableConfig);
- // NOTE: Schema is mandatory for REALTIME table.
- if (tableConfig.getTableType() == TableType.REALTIME) {
- Preconditions.checkState(schema != null, "Failed to find schema for
table: %s", _tableNameWithType);
- }
+ Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType);
+ Preconditions.checkState(schema != null, "Failed to find schema for table:
%s", _tableNameWithType);
return Pair.of(tableConfig, schema);
}
@Override
- public IndexLoadingConfig getIndexLoadingConfig(TableConfig tableConfig,
@Nullable Schema schema) {
+ public IndexLoadingConfig getIndexLoadingConfig(TableConfig tableConfig,
Schema schema) {
IndexLoadingConfig indexLoadingConfig = new
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema);
indexLoadingConfig.setTableDataDir(_tableDataDir);
return indexLoadingConfig;
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 70ecd19d5c..32de491a30 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -2078,14 +2078,13 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
tableConfig.getIndexingConfig().getNoDictionaryColumns().remove("NewAddedRawDerivedMVIntDimension");
updateTableConfig(tableConfig);
- // Need to first delete then add the schema because removing columns is
backward-incompatible change
- deleteSchema(getTableName());
+ // Need to force update the schema because removing columns is
backward-incompatible change
Schema schema = createSchema();
schema.removeField("AirlineID");
schema.removeField("ArrTime");
schema.removeField("AirTime");
schema.removeField("ArrDel15");
- addSchema(schema);
+ forceUpdateSchema(schema);
// Trigger reload
reloadAllSegments(SELECT_STAR_QUERY, true, getCountStarResult());
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
index 9a0b23bae4..8f76ce7f6e 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
@@ -55,7 +55,10 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
public class RefreshSegmentMinionClusterIntegrationTest extends
BaseClusterIntegrationTest {
@@ -151,13 +154,12 @@ public class RefreshSegmentMinionClusterIntegrationTest
extends BaseClusterInteg
String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
// Change datatype from INT -> LONG for airlineId
- deleteSchema(getTableName());
Schema schema = createSchema();
schema.getFieldSpecFor("ArrTime").setDataType(FieldSpec.DataType.LONG);
schema.getFieldSpecFor("AirlineID").setDataType(FieldSpec.DataType.STRING);
schema.getFieldSpecFor("ActualElapsedTime").setDataType(FieldSpec.DataType.FLOAT);
schema.getFieldSpecFor("DestAirportID").setDataType(FieldSpec.DataType.STRING);
- addSchema(schema);
+ forceUpdateSchema(schema);
assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
.setTablesToSchedule(Collections.singleton(offlineTableName)))
@@ -209,13 +211,12 @@ public class RefreshSegmentMinionClusterIntegrationTest
extends BaseClusterInteg
});
// Reset the schema back to it's original state.
- deleteSchema(getTableName());
schema = createSchema();
schema.getFieldSpecFor("ArrTime").setDataType(FieldSpec.DataType.INT);
schema.getFieldSpecFor("AirlineID").setDataType(FieldSpec.DataType.LONG);
schema.getFieldSpecFor("ActualElapsedTime").setDataType(FieldSpec.DataType.INT);
schema.getFieldSpecFor("DestAirportID").setDataType(FieldSpec.DataType.INT);
- addSchema(schema);
+ forceUpdateSchema(schema);
}
@Test(priority = 3)
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
index 2bc366a371..bcc9a0883d 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
@@ -81,7 +81,6 @@ public class RefreshSegmentTaskGenerator extends
BaseTaskGenerator {
String tableNameWithType = tableConfig.getTableName();
Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null for
Table: %s", tableNameWithType);
-
String taskType = RefreshSegmentTask.TASK_TYPE;
List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
PinotHelixResourceManager pinotHelixResourceManager =
_clusterInfoAccessor.getPinotHelixResourceManager();
@@ -102,7 +101,8 @@ public class RefreshSegmentTaskGenerator extends
BaseTaskGenerator {
// Get info about table and schema.
Stat tableStat = pinotHelixResourceManager.getTableStat(tableNameWithType);
- Schema schema =
pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
+ Schema schema =
pinotHelixResourceManager.getTableSchema(tableNameWithType);
+ Preconditions.checkState(schema != null, "Failed to find schema for table:
%s", tableNameWithType);
Stat schemaStat =
pinotHelixResourceManager.getSchemaStat(schema.getSchemaName());
// Get the running segments for a table.
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
index 754f7224a2..a4874a1cc9 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
@@ -43,7 +43,6 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -525,8 +524,6 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
.addDateTime(TIME_COLUMN_NAME, FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
.setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
-
when(mockPinotHelixResourceManager.getSchemaForTableConfig(Mockito.any())).thenReturn(schema);
-
RealtimeToOfflineSegmentsTaskGenerator taskGenerator = new
RealtimeToOfflineSegmentsTaskGenerator();
taskGenerator.init(mockClusterInfoAccessor);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index bef2d7fa84..7f60004fdb 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -308,7 +308,7 @@ public interface TableDataManager {
/**
* Constructs the index loading config for the table with the given table
config and schema.
*/
- IndexLoadingConfig getIndexLoadingConfig(TableConfig tableConfig, @Nullable
Schema schema);
+ IndexLoadingConfig getIndexLoadingConfig(TableConfig tableConfig, Schema
schema);
/**
* Interface to handle segment state transitions from CONSUMING to DROPPED
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 5bf6ef8bc8..d28225c923 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -32,7 +32,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
@@ -90,7 +89,6 @@ import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.DataSizeUtils;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.TimeUtils;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -137,7 +135,7 @@ public final class TableConfigUtils {
/**
* @see TableConfigUtils#validate(TableConfig, Schema, String)
*/
- public static void validate(TableConfig tableConfig, @Nullable Schema
schema) {
+ public static void validate(TableConfig tableConfig, Schema schema) {
validate(tableConfig, schema, null);
}
@@ -152,17 +150,14 @@ public final class TableConfigUtils {
*
* TODO: Add more validations for each section (e.g. validate conditions are
met for aggregateMetrics)
*/
- public static void validate(TableConfig tableConfig, @Nullable Schema
schema, @Nullable String typesToSkip) {
+ public static void validate(TableConfig tableConfig, Schema schema,
@Nullable String typesToSkip) {
+ Preconditions.checkArgument(schema != null, "Schema should not be null");
Set<ValidationType> skipTypes = parseTypesToSkipString(typesToSkip);
- if (tableConfig.getTableType() == TableType.REALTIME) {
- Preconditions.checkState(schema != null, "Schema should not be null for
REALTIME table");
- }
// Sanitize the table config before validation
sanitize(tableConfig);
// skip all validation if skip type ALL is selected.
if (!skipTypes.contains(ValidationType.ALL)) {
- validateTableSchemaConfig(tableConfig);
validateValidationConfig(tableConfig, schema);
validateIngestionConfig(tableConfig, schema);
@@ -205,7 +200,7 @@ public final class TableConfigUtils {
* @param tableConfig Table config to validate
* @return true if the table config is using instance pool and replica group
configuration, false otherwise
*/
- static boolean isTableUsingInstancePoolAndReplicaGroup(@Nonnull TableConfig
tableConfig) {
+ static boolean isTableUsingInstancePoolAndReplicaGroup(TableConfig
tableConfig) {
boolean status = true;
Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap =
tableConfig.getInstanceAssignmentConfigMap();
if (instanceAssignmentConfigMap != null) {
@@ -253,19 +248,6 @@ public final class TableConfigUtils {
}
}
- /**
- * Validates the table name with the following rule:
- * - Schema name should either be null or match the raw table name
- */
- private static void validateTableSchemaConfig(TableConfig tableConfig) {
- // Ensure that table is not created if schema is not present
- String rawTableName =
TableNameBuilder.extractRawTableName(tableConfig.getTableName());
- String schemaName = tableConfig.getValidationConfig().getSchemaName();
- if (schemaName != null && !schemaName.equals(rawTableName)) {
- throw new IllegalStateException("Schema name: " + schemaName + " does
not match table name: " + rawTableName);
- }
- }
-
/**
* Validates retention config. Checks for following things:
* - Valid segmentPushType
@@ -314,7 +296,7 @@ public final class TableConfigUtils {
* 3. Checks peerDownloadSchema
* 4. Checks time column existence if null handling for time column is
enabled
*/
- private static void validateValidationConfig(TableConfig tableConfig,
@Nullable Schema schema) {
+ private static void validateValidationConfig(TableConfig tableConfig, Schema
schema) {
SegmentsValidationAndRetentionConfig validationConfig =
tableConfig.getValidationConfig();
String timeColumnName = validationConfig.getTimeColumnName();
if (tableConfig.getTableType() == TableType.REALTIME) {
@@ -322,7 +304,7 @@ public final class TableConfigUtils {
Preconditions.checkState(timeColumnName != null, "'timeColumnName'
cannot be null in REALTIME table config");
}
// timeColumnName can be null in OFFLINE table
- if (timeColumnName != null && !timeColumnName.isEmpty() && schema != null)
{
+ if (timeColumnName != null) {
Preconditions.checkState(schema.getSpecForTimeColumn(timeColumnName) !=
null,
"Cannot find valid fieldSpec for timeColumn: %s from the table
config: %s, in the schema: %s", timeColumnName,
tableConfig.getTableName(), schema.getSchemaName());
@@ -330,8 +312,8 @@ public final class TableConfigUtils {
if (tableConfig.isDimTable()) {
Preconditions.checkState(tableConfig.getTableType() == TableType.OFFLINE,
"Dimension table must be of OFFLINE table type.");
- Preconditions.checkState(schema != null, "Dimension table must have an
associated schema");
- Preconditions.checkState(!schema.getPrimaryKeyColumns().isEmpty(),
"Dimension table must have primary key[s]");
+
Preconditions.checkState(CollectionUtils.isNotEmpty(schema.getPrimaryKeyColumns()),
+ "Dimension table must have primary key[s]");
}
String peerSegmentDownloadScheme =
validationConfig.getPeerSegmentDownloadScheme();
@@ -360,7 +342,7 @@ public final class TableConfigUtils {
* 6. ingestion type for dimension tables
*/
@VisibleForTesting
- public static void validateIngestionConfig(TableConfig tableConfig,
@Nullable Schema schema) {
+ public static void validateIngestionConfig(TableConfig tableConfig, Schema
schema) {
IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
if (ingestionConfig != null) {
@@ -395,7 +377,7 @@ public final class TableConfigUtils {
Preconditions.checkState(indexingConfig == null ||
MapUtils.isEmpty(indexingConfig.getStreamConfigs()),
"Should not use indexingConfig#getStreamConfigs if
ingestionConfig#StreamIngestionConfig is provided");
List<Map<String, String>> streamConfigMaps =
ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps();
- Preconditions.checkState(streamConfigMaps.size() > 0, "Must have at
least 1 stream in REALTIME table");
+ Preconditions.checkState(!streamConfigMaps.isEmpty(), "Must have at
least 1 stream in REALTIME table");
// TODO: for multiple stream configs, validate them
}
@@ -431,14 +413,11 @@ public final class TableConfigUtils {
"columnName/aggregationFunction cannot be null in
AggregationConfig " + aggregationConfig);
}
- FieldSpec fieldSpec = null;
- if (schema != null) {
- fieldSpec = schema.getFieldSpecFor(columnName);
- Preconditions.checkState(fieldSpec != null, "The destination
column '" + columnName
- + "' of the aggregation function must be present in the
schema");
- Preconditions.checkState(fieldSpec.getFieldType() ==
FieldSpec.FieldType.METRIC,
- "The destination column '" + columnName + "' of the
aggregation function must be a metric column");
- }
+ FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
+ Preconditions.checkState(fieldSpec != null,
+ "The destination column '" + columnName + "' of the aggregation
function must be present in the schema");
+ Preconditions.checkState(fieldSpec.getFieldType() ==
FieldSpec.FieldType.METRIC,
+ "The destination column '" + columnName + "' of the aggregation
function must be a metric column");
if (!aggregationColumns.add(columnName)) {
throw new IllegalStateException("Duplicate aggregation config
found for column '" + columnName + "'");
@@ -471,11 +450,9 @@ public final class TableConfigUtils {
Preconditions.checkState(StringUtils.isNumeric(literal),
"Second argument of DISTINCT_COUNT_HLL must be a number:
%s", aggregationConfig);
}
- if (fieldSpec != null) {
- DataType dataType = fieldSpec.getDataType();
- Preconditions.checkState(dataType == DataType.BYTES,
- "Result type for DISTINCT_COUNT_HLL must be BYTES: %s",
aggregationConfig);
- }
+ DataType dataType = fieldSpec.getDataType();
+ Preconditions.checkState(dataType == DataType.BYTES, "Result type
for DISTINCT_COUNT_HLL must be BYTES: %s",
+ aggregationConfig);
} else if (functionType == DISTINCTCOUNTHLLPLUS) {
Preconditions.checkState(numArguments >= 1 && numArguments <= 3,
"DISTINCT_COUNT_HLL_PLUS can have at most three arguments:
%s", aggregationConfig);
@@ -525,10 +502,8 @@ public final class TableConfigUtils {
aggregationSourceColumns.add(firstArgument.getIdentifier());
}
- if (schema != null) {
- Preconditions.checkState(new
HashSet<>(schema.getMetricNames()).equals(aggregationColumns),
- "all metric columns must be aggregated");
- }
+ Preconditions.checkState(new
HashSet<>(schema.getMetricNames()).equals(aggregationColumns),
+ "all metric columns must be aggregated");
// This is required by
MutableSegmentImpl.enableMetricsAggregationIfPossible().
// That code will disable ingestion aggregation if all metrics aren't
noDictionaryColumns.
@@ -565,13 +540,10 @@ public final class TableConfigUtils {
if (!transformColumns.add(columnName)) {
throw new IllegalStateException("Duplicate transform config found
for column '" + columnName + "'");
}
- if (schema != null) {
- Preconditions.checkState(
- schema.getFieldSpecFor(columnName) != null ||
aggregationSourceColumns.contains(columnName),
- "The destination column '" + columnName
- + "' of the transform function must be present in the
schema or as a source column for "
- + "aggregations");
- }
+ Preconditions.checkState(schema.hasColumn(columnName) ||
aggregationSourceColumns.contains(columnName),
+ "The destination column '" + columnName
+ + "' of the transform function must be present in the schema
or as a source column for "
+ + "aggregations");
FunctionEvaluator expressionEvaluator;
if (_disableGroovy &&
FunctionEvaluatorFactory.isGroovyExpression(transformFunction)) {
throw new IllegalStateException(
@@ -595,7 +567,7 @@ public final class TableConfigUtils {
// Complex configs
ComplexTypeConfig complexTypeConfig =
ingestionConfig.getComplexTypeConfig();
- if (complexTypeConfig != null && schema != null) {
+ if (complexTypeConfig != null) {
Map<String, String> prefixesToRename =
complexTypeConfig.getPrefixesToRename();
if (MapUtils.isNotEmpty(prefixesToRename)) {
Set<String> fieldNames = schema.getColumnNames();
@@ -611,7 +583,7 @@ public final class TableConfigUtils {
SchemaConformingTransformerConfig schemaConformingTransformerConfig =
ingestionConfig.getSchemaConformingTransformerConfig();
- if (null != schemaConformingTransformerConfig && null != schema) {
+ if (schemaConformingTransformerConfig != null) {
SchemaConformingTransformer.validateSchema(schema,
schemaConformingTransformerConfig);
}
}
@@ -983,10 +955,7 @@ public final class TableConfigUtils {
* Also ensures proper dependency between index types (eg: Inverted Index
columns
* cannot be present in no-dictionary columns).
*/
- private static void validateIndexingConfig(IndexingConfig indexingConfig,
@Nullable Schema schema) {
- if (schema == null) {
- return;
- }
+ private static void validateIndexingConfig(IndexingConfig indexingConfig,
Schema schema) {
ArrayListMultimap<String, String> columnNameToConfigMap =
ArrayListMultimap.create();
Set<String> noDictionaryColumnsSet = new HashSet<>();
@@ -1189,7 +1158,7 @@ public final class TableConfigUtils {
* Additional checks for TEXT and FST index types
* Validates index compatibility for forward index disabled columns
*/
- private static void validateFieldConfigList(TableConfig tableConfig,
@Nullable Schema schema) {
+ private static void validateFieldConfigList(TableConfig tableConfig, Schema
schema) {
List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
TableType tableType = tableConfig.getTableType();
@@ -1200,6 +1169,9 @@ public final class TableConfigUtils {
for (FieldConfig fieldConfig : fieldConfigList) {
String columnName = fieldConfig.getName();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
+ Preconditions.checkState(fieldSpec != null,
+ "Column: %s defined in field config list must be a valid column
defined in the schema", columnName);
EncodingType encodingType = fieldConfig.getEncodingType();
Preconditions.checkArgument(encodingType != null, "Encoding type must be
specified for column: %s", columnName);
CompressionCodec compressionCodec = fieldConfig.getCompressionCodec();
@@ -1211,10 +1183,8 @@ public final class TableConfigUtils {
"Compression codec: %s is not applicable to raw index",
compressionCodec);
if ((compressionCodec == CompressionCodec.CLP || compressionCodec ==
CompressionCodec.CLPV2
- || compressionCodec == CompressionCodec.CLPV2_ZSTD ||
compressionCodec == CompressionCodec.CLPV2_LZ4)
- && schema != null) {
- Preconditions.checkArgument(
-
schema.getFieldSpecFor(columnName).getDataType().getStoredType() ==
DataType.STRING,
+ || compressionCodec == CompressionCodec.CLPV2_ZSTD ||
compressionCodec == CompressionCodec.CLPV2_LZ4)) {
+
Preconditions.checkArgument(fieldSpec.getDataType().getStoredType() ==
DataType.STRING,
"CLP compression codec can only be applied to string columns");
}
break;
@@ -1232,13 +1202,6 @@ public final class TableConfigUtils {
break;
}
- if (schema == null) {
- return;
- }
- FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
- Preconditions.checkState(fieldSpec != null,
- "Column: %s defined in field config list must be a valid column
defined in the schema", columnName);
-
// Validate the forward index disabled compatibility with other indexes
if enabled for this column
validateForwardIndexDisabledIndexCompatibility(columnName, fieldConfig,
indexingConfig, schema, tableType);
@@ -1493,7 +1456,7 @@ public final class TableConfigUtils {
return false;
}
- private static boolean isRoutingStrategyAllowedForUpsert(@Nonnull
RoutingConfig routingConfig) {
+ private static boolean isRoutingStrategyAllowedForUpsert(RoutingConfig
routingConfig) {
String instanceSelectorType = routingConfig.getInstanceSelectorType();
return UPSERT_DEDUP_ALLOWED_ROUTING_STRATEGIES.stream().anyMatch(x ->
x.equalsIgnoreCase(instanceSelectorType));
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index b5ca43e4ba..b3bf9d3e1a 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -91,7 +91,7 @@ public class TableConfigUtilsTest {
try {
TableConfigUtils.validate(tableConfig, null);
Assert.fail("Should fail for null timeColumnName and null schema in
REALTIME table");
- } catch (IllegalStateException e) {
+ } catch (IllegalArgumentException e) {
// expected
}
@@ -101,7 +101,7 @@ public class TableConfigUtilsTest {
try {
TableConfigUtils.validate(tableConfig, null);
Assert.fail("Should fail for null schema in REALTIME table");
- } catch (IllegalStateException e) {
+ } catch (IllegalArgumentException e) {
// expected
}
@@ -149,12 +149,22 @@ public class TableConfigUtilsTest {
// OFFLINE table
// null timeColumnName and schema - allowed in OFFLINE
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
- TableConfigUtils.validate(tableConfig, null);
+ try {
+ TableConfigUtils.validate(tableConfig, null);
+ Assert.fail("Should fail for null timeColumnName and null schema in
OFFLINE table");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
// null schema only - allowed in OFFLINE
tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
- TableConfigUtils.validate(tableConfig, null);
+ try {
+ TableConfigUtils.validate(tableConfig, null);
+ Assert.fail("Should fail for null schema in OFFLINE table");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
// null timeColumnName only - allowed in OFFLINE
schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
@@ -172,7 +182,7 @@ public class TableConfigUtilsTest {
// expected
}
- // non-null schema nd timeColumnName, but timeColumnName not present as a
time spec in schema
+ // non-null schema and timeColumnName, but timeColumnName not present as a
time spec in schema
schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
.addSingleValueDimension(TIME_COLUMN,
FieldSpec.DataType.STRING).build();
tableConfig =
@@ -184,11 +194,6 @@ public class TableConfigUtilsTest {
// expected
}
- // empty timeColumnName - valid
- schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName("").build();
- TableConfigUtils.validate(tableConfig, schema);
-
// valid
schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
.addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
@@ -217,7 +222,7 @@ public class TableConfigUtilsTest {
try {
TableConfigUtils.validate(tableConfig, null);
Assert.fail("Should fail with a Dimension table without a schema");
- } catch (IllegalStateException e) {
+ } catch (IllegalArgumentException e) {
// expected
}
@@ -327,7 +332,7 @@ public class TableConfigUtilsTest {
// invalid transform config since Groovy is disabled
try {
TableConfigUtils.setDisableGroovy(true);
- TableConfigUtils.validate(tableConfig, schema, null);
+ TableConfigUtils.validate(tableConfig, schema);
// Reset to false
TableConfigUtils.setDisableGroovy(false);
Assert.fail("Should fail when Groovy functions disabled but found in
transform config");
@@ -363,7 +368,7 @@ public class TableConfigUtilsTest {
ingestionConfig.setFilterConfig(new FilterConfig("Groovy({timestamp > 0},
timestamp)"));
try {
TableConfigUtils.setDisableGroovy(true);
- TableConfigUtils.validate(tableConfig, schema, null);
+ TableConfigUtils.validate(tableConfig, schema);
// Reset to false
TableConfigUtils.setDisableGroovy(false);
Assert.fail("Should fail when Groovy functions disabled but found in
filter config");
@@ -677,23 +682,27 @@ public class TableConfigUtilsTest {
@Test
public void ingestionStreamConfigsTest() {
+ Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+ .addDateTime("timeColumn", FieldSpec.DataType.TIMESTAMP,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .build();
Map<String, String> streamConfigs = getStreamConfigs();
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setStreamIngestionConfig(new
StreamIngestionConfig(Arrays.asList(streamConfigs, streamConfigs)));
- TableConfig tableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
- .setIngestionConfig(ingestionConfig).build();
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+ .setTimeColumnName("timeColumn")
+ .setIngestionConfig(ingestionConfig)
+ .build();
// Multiple stream configs are allowed
try {
- TableConfigUtils.validateIngestionConfig(tableConfig, null);
+ TableConfigUtils.validateIngestionConfig(tableConfig, schema);
} catch (IllegalStateException e) {
Assert.fail("Multiple stream configs should be supported");
}
// stream config should be valid
ingestionConfig.setStreamIngestionConfig(new
StreamIngestionConfig(Collections.singletonList(streamConfigs)));
- TableConfigUtils.validateIngestionConfig(tableConfig, null);
+ TableConfigUtils.validateIngestionConfig(tableConfig, schema);
// validate the proto decoder
streamConfigs = getKafkaStreamConfigs();
@@ -759,6 +768,8 @@ public class TableConfigUtilsTest {
@Test
public void ingestionBatchConfigsTest() {
+ Schema schema = new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
+
Map<String, String> batchConfigMap = new HashMap<>();
batchConfigMap.put(BatchConfigProperties.INPUT_DIR_URI, "s3://foo");
batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, "gs://bar");
@@ -773,11 +784,14 @@ public class TableConfigUtilsTest {
new BatchIngestionConfig(Arrays.asList(batchConfigMap,
batchConfigMap), null, null));
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(ingestionConfig).build();
- TableConfigUtils.validateIngestionConfig(tableConfig, null);
+ TableConfigUtils.validateIngestionConfig(tableConfig, schema);
}
@Test
public void ingestionConfigForDimensionTableTest() {
+ Schema schema =
+ new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).setPrimaryKeyColumns(List.of("pk")).build();
+
Map<String, String> batchConfigMap = new HashMap<>();
batchConfigMap.put(BatchConfigProperties.INPUT_DIR_URI, "s3://foo");
batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, "gs://bar");
@@ -792,12 +806,12 @@ public class TableConfigUtilsTest {
new BatchIngestionConfig(Collections.singletonList(batchConfigMap),
"REFRESH", null));
TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIsDimTable(true)
.setIngestionConfig(ingestionConfig).build();
- TableConfigUtils.validateIngestionConfig(tableConfig, null);
+ TableConfigUtils.validateIngestionConfig(tableConfig, schema);
// dimension tables should have batch ingestion config
ingestionConfig.setBatchIngestionConfig(null);
try {
- TableConfigUtils.validateIngestionConfig(tableConfig, null);
+ TableConfigUtils.validateIngestionConfig(tableConfig, schema);
Assert.fail("Should fail for Dimension table without batch ingestion
config");
} catch (IllegalStateException e) {
// expected
@@ -807,7 +821,7 @@ public class TableConfigUtilsTest {
ingestionConfig.setBatchIngestionConfig(
new BatchIngestionConfig(Collections.singletonList(batchConfigMap),
"APPEND", null));
try {
- TableConfigUtils.validateIngestionConfig(tableConfig, null);
+ TableConfigUtils.validateIngestionConfig(tableConfig, schema);
Assert.fail("Should fail for Dimension table with ingestion type APPEND
(should be REFRESH)");
} catch (IllegalStateException e) {
// expected
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadZKClient.java
b/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadZKClient.java
index 6df20fe491..c504af4a65 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadZKClient.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadZKClient.java
@@ -154,26 +154,24 @@ public class PredownloadZKClient implements AutoCloseable
{
* @param tableInfoMap Map of table name to table info to be
filled with table config.
*/
public void updateSegmentMetadata(List<PredownloadSegmentInfo>
predownloadSegmentInfoList,
- Map<String, PredownloadTableInfo> tableInfoMap,
- InstanceDataManagerConfig instanceDataManagerConfig) {
+ Map<String, PredownloadTableInfo> tableInfoMap,
InstanceDataManagerConfig instanceDataManagerConfig) {
// fallback path comes from ZKHelixManager.class getHelixPropertyStore
method
ZkHelixPropertyStore<ZNRecord> propertyStore = new
AutoFallbackPropertyStore<>(new ZkBaseDataAccessor<>(_zkClient),
PropertyPathBuilder.propertyStore(_clusterName),
String.format("/%s/%s", _clusterName, "HELIX_PROPERTYSTORE"));
for (PredownloadSegmentInfo predownloadSegmentInfo :
predownloadSegmentInfoList) {
-
tableInfoMap.computeIfAbsent(predownloadSegmentInfo.getTableNameWithType(),
name -> {
- TableConfig tableConfig =
- ZKMetadataProvider.getTableConfig(propertyStore,
predownloadSegmentInfo.getTableNameWithType());
+ String tableNameWithType = predownloadSegmentInfo.getTableNameWithType();
+ tableInfoMap.computeIfAbsent(tableNameWithType, name -> {
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(propertyStore, tableNameWithType);
if (tableConfig == null) {
LOGGER.warn("Cannot predownload segment {} because not able to get
its table config from ZK",
predownloadSegmentInfo.getSegmentName());
return null;
}
- Schema schema = ZKMetadataProvider.getTableSchema(propertyStore,
tableConfig);
+ Schema schema = ZKMetadataProvider.getTableSchema(propertyStore,
tableNameWithType);
return new PredownloadTableInfo(name, tableConfig, schema,
instanceDataManagerConfig);
});
- SegmentZKMetadata segmentZKMetadata =
- ZKMetadataProvider.getSegmentZKMetadata(propertyStore,
predownloadSegmentInfo.getTableNameWithType(),
- predownloadSegmentInfo.getSegmentName());
+ SegmentZKMetadata segmentZKMetadata =
ZKMetadataProvider.getSegmentZKMetadata(propertyStore, tableNameWithType,
+ predownloadSegmentInfo.getSegmentName());
if (segmentZKMetadata == null) {
LOGGER.warn("Cannot predownload segment {} because not able to get its
metadata from ZK",
predownloadSegmentInfo.getSegmentName());
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
index 2bfdc051a6..536128b7c4 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
@@ -38,8 +38,6 @@ public class SegmentsValidationAndRetentionConfig extends
BaseJsonConfig {
private String _replication;
@Deprecated // Use _replication instead
private String _replicasPerPartition;
- @Deprecated // Schema name should be the same as raw table name
- private String _schemaName;
private String _timeColumnName;
private TimeUnit _timeType;
@Deprecated // Use SegmentAssignmentConfig instead
@@ -170,19 +168,6 @@ public class SegmentsValidationAndRetentionConfig extends
BaseJsonConfig {
_replicasPerPartition = replicasPerPartition;
}
- /**
- * @deprecated Schema name should be the same as raw table name
- */
- @Deprecated
- public String getSchemaName() {
- return _schemaName;
- }
-
- @Deprecated
- public void setSchemaName(String schemaName) {
- _schemaName = schemaName;
- }
-
/**
* @deprecated Use {@link InstanceAssignmentConfig} instead.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]