This is an automated email from the ASF dual-hosted git repository.
xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6aae074def8 Add isMaterializedView flag on TableConfig as the single
MV identity (#18564)
6aae074def8 is described below
commit 6aae074def84a46b5ce93821fc3313121b9c9126
Author: Hongkun Xu <[email protected]>
AuthorDate: Fri May 22 16:11:21 2026 +0800
Add isMaterializedView flag on TableConfig as the single MV identity
(#18564)
Today, identifying a materialized view (MV) table requires scattered
heuristics: scanning task configs for MaterializedViewTask, fetching
the ZK definition znode, or parsing the definedSQL. This makes MV
identity ambiguous on the create/update path and forces every caller
that needs to ask "is this an MV?" to repeat the same multi-source
inference, which is also subject to ordering issues at table-create
time (the definition znode is created lazily by the scheduler on the
first task run, not synchronously at table create).
Introduce isMaterializedView on TableConfig as the single source of
truth for MV identity, modeled on isDimTable:
- pinot-spi: add IS_MATERIALIZED_VIEW_KEY, _materializedView field,
constructor/copy parameter, getter, and JsonIgnore helpers
getMaterializedViewTaskConfigs() /
hasMaterializedViewTaskWithDefinedSql(). TableConfigBuilder gains
setIsMaterializedView().
- pinot-common: persist isMaterializedView in the ZN simple fields.
- pinot-sql-ddl: PropertyMapping accepts ismaterializedview;
PropertyExtractor emits it on reverse-render.
- pinot-segment-local TableConfigUtils:
* validateMaterializedViewInvariants: flag=true requires OFFLINE
+ MaterializedViewTask + non-empty definedSQL, and forbids
coexistence with isDimTable; MaterializedViewTask without the
flag (or without definedSQL) is rejected.
* validateBackwardCompatibility: isMaterializedView cannot be
toggled; definedSQL is immutable after MV creation;
MaterializedViewTask cannot be removed from an MV table.
- pinot-materialized-view:
* MaterializedViewAnalyzer.analyze now requires the flag.
* MaterializedViewTaskScheduler skips task generation when the
flag is false.
- pinot-controller PinotHelixResourceManager: both the table-create
and table-drop MV-consistency notifications are now gated on
tableConfig.isMaterializedView(); the drop path no longer falls
back to task-config scan or CalciteSqlParser to infer MV identity.
- airlineStatsMv example table config sets isMaterializedView: true.
Tests:
- MaterializedViewAnalyzerTest: every MV TableConfigBuilder now sets
setIsMaterializedView(true); new test asserts analyze fails when
the flag is missing.
- TableConfigUtilsTest: new tests cover the four
validateMaterializedViewInvariants branches and the
definedSQL-immutable / flag-cannot-change backward-compatibility
rules; existing positional TableConfig constructor call sites
updated for the new parameter.
- PinotDataWriter.scala (spark-3 connector) dummy TableConfig
constructor updated for the new positional parameter.
This is intentionally identity-only: the existing ZK Definition and
Runtime znodes still drive the MV runtime view (watermark, partition
state) and the /materializedViews REST listing endpoint; aligning
those with the flag is left to a follow-up.
Signed-off-by: Hongkun Xu <[email protected]>
Co-authored-by: Xiang Fu <[email protected]>
Co-authored-by: Cursor <[email protected]>
---
.../common/utils/config/TableConfigSerDeUtils.java | 6 +-
.../helix/core/PinotHelixResourceManager.java | 46 +++++---------
.../analysis/MaterializedViewAnalyzer.java | 4 ++
.../scheduler/MaterializedViewTaskScheduler.java | 6 ++
.../analysis/MaterializedViewAnalyzerTest.java | 36 +++++++++++
.../segment/local/utils/TableConfigUtils.java | 69 ++++++++++++++++++++
.../segment/local/utils/TableConfigUtilsTest.java | 74 ++++++++++++++++++++++
.../apache/pinot/spi/config/table/TableConfig.java | 60 +++++++++++++++++-
.../spi/utils/builder/TableConfigBuilder.java | 9 ++-
.../pinot/sql/ddl/compile/PropertyMapping.java | 3 +
.../pinot/sql/ddl/reverse/PropertyExtractor.java | 3 +
.../airlineStatsMv_offline_table_config.json | 1 +
12 files changed, 285 insertions(+), 32 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtils.java
index a24583891fd..436e58f3ae5 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtils.java
@@ -66,6 +66,8 @@ public class TableConfigSerDeUtils {
String tableType = simpleFields.get(TableConfig.TABLE_TYPE_KEY);
boolean isDimTable =
Boolean.parseBoolean(simpleFields.get(TableConfig.IS_DIM_TABLE_KEY));
+ boolean isMaterializedView =
+
Boolean.parseBoolean(simpleFields.get(TableConfig.IS_MATERIALIZED_VIEW_KEY));
Preconditions.checkState(tableType != null,
FIELD_MISSING_MESSAGE_TEMPLATE, TableConfig.TABLE_TYPE_KEY);
String validationConfigString =
simpleFields.get(TableConfig.VALIDATION_CONFIG_KEY);
@@ -198,7 +200,8 @@ public class TableConfigSerDeUtils {
new TableConfig(tableName, tableType, validationConfig, tenantConfig,
indexingConfig, customConfig,
quotaConfig, taskConfig, routingConfig, queryConfig,
instanceAssignmentConfigMap, fieldConfigList,
upsertConfig, dedupConfig, dimensionTableConfig, ingestionConfig,
tierConfigList, isDimTable,
- tunerConfigList, instancePartitionsMap,
segmentAssignmentConfigMap, tableSamplerConfigs);
+ tunerConfigList, instancePartitionsMap, segmentAssignmentConfigMap,
+ tableSamplerConfigs, isMaterializedView);
tableConfig.setDescription(description);
tableConfig.setTags(tags);
return tableConfig;
@@ -216,6 +219,7 @@ public class TableConfigSerDeUtils {
simpleFields.put(TableConfig.INDEXING_CONFIG_KEY,
tableConfig.getIndexingConfig().toJsonString());
simpleFields.put(TableConfig.CUSTOM_CONFIG_KEY,
tableConfig.getCustomConfig().toJsonString());
simpleFields.put(TableConfig.IS_DIM_TABLE_KEY,
Boolean.toString(tableConfig.isDimTable()));
+ simpleFields.put(TableConfig.IS_MATERIALIZED_VIEW_KEY,
Boolean.toString(tableConfig.isMaterializedView()));
// Optional fields
QuotaConfig quotaConfig = tableConfig.getQuotaConfig();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index dc4d50e5a1c..d5cb2b1d97f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -163,6 +163,7 @@ import
org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
import org.apache.pinot.controller.workload.QueryWorkloadManager;
import org.apache.pinot.core.util.NumberUtils;
import org.apache.pinot.core.util.NumericException;
+import org.apache.pinot.materializedview.analysis.MaterializedViewAnalyzer;
import
org.apache.pinot.materializedview.consistency.MaterializedViewConsistencyManager;
import
org.apache.pinot.materializedview.metadata.MaterializedViewDefinitionMetadata;
import
org.apache.pinot.materializedview.metadata.MaterializedViewDefinitionMetadataUtils;
@@ -174,7 +175,6 @@ import org.apache.pinot.spi.config.instance.Instance;
import org.apache.pinot.spi.config.instance.InstanceConfigValidatorRegistry;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableStatsHumanReadable;
-import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TagOverrideConfig;
import org.apache.pinot.spi.config.table.TenantConfig;
@@ -205,7 +205,6 @@ import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.spi.utils.retry.RetryPolicy;
-import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -5377,24 +5376,27 @@ public class PinotHelixResourceManager {
private void
notifyMaterializedViewConsistencyManagerForTableCreate(TableConfig tableConfig)
{
MaterializedViewConsistencyManager mgr =
_materializedViewConsistencyManager;
- if (mgr == null) {
+ if (mgr == null || !tableConfig.isMaterializedView()) {
return;
}
try {
- TableTaskConfig taskConfig = tableConfig.getTaskConfig();
- if (taskConfig == null) {
+ MaterializedViewDefinitionMetadata definition =
+ MaterializedViewDefinitionMetadataUtils.fetch(_propertyStore,
tableConfig.getTableName());
+ if (definition != null && definition.getBaseTables() != null &&
!definition.getBaseTables().isEmpty()) {
+ mgr.onMaterializedViewTableCreated(tableConfig.getTableName(),
definition.getBaseTables());
return;
}
- Map<String, String> materializedViewTaskConfigs =
-
taskConfig.getConfigsForTaskType(CommonConstants.MaterializedViewTask.TASK_TYPE);
+ Map<String, String> materializedViewTaskConfigs =
tableConfig.getMaterializedViewTaskConfigs();
if (materializedViewTaskConfigs == null) {
+ LOGGER.warn("MV table {} has no MaterializedViewTask config for
consistency registration",
+ tableConfig.getTableName());
return;
}
String definedSQL =
materializedViewTaskConfigs.get(CommonConstants.MaterializedViewTask.DEFINED_SQL_KEY);
if (definedSQL == null || definedSQL.isEmpty()) {
return;
}
- String sourceTable =
CalciteSqlParser.compileToPinotQuery(definedSQL).getDataSource().getTableName();
+ String sourceTable =
MaterializedViewAnalyzer.extractSourceTableName(definedSQL);
mgr.onMaterializedViewTableCreated(tableConfig.getTableName(),
Collections.singletonList(sourceTable));
} catch (Exception e) {
LOGGER.warn("Failed to register MV table {} with consistency manager",
tableConfig.getTableName(), e);
@@ -5407,33 +5409,19 @@ public class PinotHelixResourceManager {
return;
}
try {
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+ if (tableConfig != null && !tableConfig.isMaterializedView()) {
+ return;
+ }
MaterializedViewDefinitionMetadata materializedViewDefinition =
MaterializedViewDefinitionMetadataUtils.fetch(_propertyStore,
tableNameWithType);
if (materializedViewDefinition != null &&
materializedViewDefinition.getBaseTables() != null
&& !materializedViewDefinition.getBaseTables().isEmpty()) {
mgr.onMaterializedViewTableDropped(tableNameWithType,
materializedViewDefinition.getBaseTables());
- return;
- }
- // Fall back to reading source table from table task config
- TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
- if (tableConfig == null) {
- return;
- }
- TableTaskConfig taskConfig = tableConfig.getTaskConfig();
- if (taskConfig == null) {
- return;
- }
- Map<String, String> materializedViewTaskConfigs =
-
taskConfig.getConfigsForTaskType(CommonConstants.MaterializedViewTask.TASK_TYPE);
- if (materializedViewTaskConfigs == null) {
- return;
- }
- String definedSQL =
materializedViewTaskConfigs.get(CommonConstants.MaterializedViewTask.DEFINED_SQL_KEY);
- if (definedSQL == null || definedSQL.isEmpty()) {
- return;
+ } else if (tableConfig != null && tableConfig.isMaterializedView()) {
+ LOGGER.warn("MV table {} dropped without definition metadata;
consistency reverse index may be stale",
+ tableNameWithType);
}
- String sourceTable =
CalciteSqlParser.compileToPinotQuery(definedSQL).getDataSource().getTableName();
- mgr.onMaterializedViewTableDropped(tableNameWithType,
Collections.singletonList(sourceTable));
} catch (Exception e) {
LOGGER.warn("Failed to unregister MV table {} from consistency manager",
tableNameWithType, e);
}
diff --git
a/pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/analysis/MaterializedViewAnalyzer.java
b/pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/analysis/MaterializedViewAnalyzer.java
index 9bae3d479ff..118276b92f9 100644
---
a/pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/analysis/MaterializedViewAnalyzer.java
+++
b/pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/analysis/MaterializedViewAnalyzer.java
@@ -102,6 +102,10 @@ public final class MaterializedViewAnalyzer {
public static AnalysisResult analyze(String definedSql, TableConfig
viewTableConfig, Schema viewSchema,
Map<String, String> taskConfigs, MaterializedViewTaskGeneratorContext
context) {
+ Preconditions.checkState(viewTableConfig.isMaterializedView(),
+ "MaterializedViewAnalyzer requires isMaterializedView=true on table:
%s",
+ viewTableConfig.getTableName());
+
// Step 4 first: cheap config checks that don't require parsing
long bucketMs = validateTaskConfigs(viewTableConfig, taskConfigs, context);
diff --git
a/pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/scheduler/MaterializedViewTaskScheduler.java
b/pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/scheduler/MaterializedViewTaskScheduler.java
index 86708f9f6b2..1f874617ea9 100644
---
a/pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/scheduler/MaterializedViewTaskScheduler.java
+++
b/pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/scheduler/MaterializedViewTaskScheduler.java
@@ -129,6 +129,12 @@ public class MaterializedViewTaskScheduler {
for (TableConfig tableConfig : tableConfigs) {
String offlineTableName = tableConfig.getTableName();
+ if (!tableConfig.isMaterializedView()) {
+ LOGGER.warn("Skip generating task: {} for table: {} because
isMaterializedView is not set",
+ taskType, offlineTableName);
+ continue;
+ }
+
if (tableConfig.getTableType() != TableType.OFFLINE) {
LOGGER.warn("Skip generating task: {} for non-OFFLINE table: {}",
taskType, offlineTableName);
continue;
diff --git
a/pinot-materialized-view/src/test/java/org/apache/pinot/materializedview/analysis/MaterializedViewAnalyzerTest.java
b/pinot-materialized-view/src/test/java/org/apache/pinot/materializedview/analysis/MaterializedViewAnalyzerTest.java
index dd76834650b..f5ab621e90e 100644
---
a/pinot-materialized-view/src/test/java/org/apache/pinot/materializedview/analysis/MaterializedViewAnalyzerTest.java
+++
b/pinot-materialized-view/src/test/java/org/apache/pinot/materializedview/analysis/MaterializedViewAnalyzerTest.java
@@ -165,6 +165,7 @@ public class MaterializedViewAnalyzerTest {
// point to the SELECT alias 'dayBucket' — not the inherited base name.
TableConfig viewTableConfig = new TableConfigBuilder(TableType.OFFLINE)
.setTableName("mv_orders")
+ .setIsMaterializedView(true)
.setTimeColumnName("dayBucket")
.build();
Map<String, String> taskConfigs = buildTaskConfigs(sql);
@@ -653,6 +654,30 @@ public class MaterializedViewAnalyzerTest {
// Step 4: Task config parameter validation
// -----------------------------------------------------------------------
+ @Test
+ public void testAnalyzeRequiresIsMaterializedViewFlag() {
+ String sql = "SELECT DaysSinceEpoch, city, count(*) AS cnt FROM orders
GROUP BY DaysSinceEpoch, city";
+ Schema viewSchema = new Schema.SchemaBuilder()
+ .addSingleValueDimension("city", FieldSpec.DataType.STRING)
+ .addMetric("cnt", FieldSpec.DataType.LONG)
+ .addDateTime(TIME_COLUMN, FieldSpec.DataType.TIMESTAMP,
"1:MILLISECONDS:TIMESTAMP", "1:MILLISECONDS")
+ .build();
+
+ TableConfig viewTableConfig = new TableConfigBuilder(TableType.OFFLINE)
+ .setTableName("mv_orders")
+ .setTimeColumnName(TIME_COLUMN)
+ .build();
+ Map<String, String> taskConfigs = buildTaskConfigs(sql);
+
+ try {
+ MaterializedViewAnalyzer.analyze(withLimit(sql), viewTableConfig,
viewSchema, taskConfigs, _mockAccessor);
+ fail("Expected IllegalStateException when isMaterializedView is false");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("isMaterializedView=true"),
+ "Unexpected message: " + e.getMessage());
+ }
+ }
+
@Test
public void testNonOfflineTableType() {
String sql = "SELECT DaysSinceEpoch, city, count(*) AS cnt FROM orders
GROUP BY DaysSinceEpoch, city";
@@ -664,6 +689,7 @@ public class MaterializedViewAnalyzerTest {
TableConfig realtimeConfig = new TableConfigBuilder(TableType.REALTIME)
.setTableName("mv_orders")
+ .setIsMaterializedView(true)
.setTimeColumnName(TIME_COLUMN)
.build();
Map<String, String> taskConfigs = buildTaskConfigs(sql);
@@ -800,6 +826,7 @@ public class MaterializedViewAnalyzerTest {
TableConfig viewTableConfig = new TableConfigBuilder(TableType.OFFLINE)
.setTableName("mv_orders")
+ .setIsMaterializedView(true)
.build();
Map<String, String> taskConfigs = buildTaskConfigs(sql);
@@ -824,6 +851,7 @@ public class MaterializedViewAnalyzerTest {
// timeColumnName points to a column that doesn't exist in the MV schema
at all.
TableConfig viewTableConfig = new TableConfigBuilder(TableType.OFFLINE)
.setTableName("mv_orders")
+ .setIsMaterializedView(true)
.setTimeColumnName("nonexistent_time_col")
.build();
Map<String, String> taskConfigs = buildTaskConfigs(sql);
@@ -849,6 +877,7 @@ public class MaterializedViewAnalyzerTest {
// timeColumnName points to a plain dimension, not a registered dateTime
column.
TableConfig viewTableConfig = new TableConfigBuilder(TableType.OFFLINE)
.setTableName("mv_orders")
+ .setIsMaterializedView(true)
.setTimeColumnName("city")
.build();
Map<String, String> taskConfigs = buildTaskConfigs(sql);
@@ -878,6 +907,7 @@ public class MaterializedViewAnalyzerTest {
TableConfig viewTableConfig = new TableConfigBuilder(TableType.OFFLINE)
.setTableName("mv_orders")
+ .setIsMaterializedView(true)
.setTimeColumnName(TIME_COLUMN)
.build();
Map<String, String> taskConfigs = buildTaskConfigs(sql);
@@ -907,6 +937,7 @@ public class MaterializedViewAnalyzerTest {
TableConfig viewTableConfig = new TableConfigBuilder(TableType.OFFLINE)
.setTableName("mv_orders")
+ .setIsMaterializedView(true)
.setTimeColumnName("day")
.build();
Map<String, String> taskConfigs = buildTaskConfigs(sql);
@@ -978,6 +1009,7 @@ public class MaterializedViewAnalyzerTest {
TableConfig viewTableConfig = new TableConfigBuilder(TableType.OFFLINE)
.setTableName("mv_orders")
+ .setIsMaterializedView(true)
.setTimeColumnName("day")
.build();
Map<String, String> taskConfigs = buildTaskConfigs(sql);
@@ -1000,6 +1032,7 @@ public class MaterializedViewAnalyzerTest {
TableConfig viewTableConfig = new TableConfigBuilder(TableType.OFFLINE)
.setTableName("mv_orders")
+ .setIsMaterializedView(true)
.setTimeColumnName("hr")
.build();
Map<String, String> taskConfigs = buildTaskConfigs(sql);
@@ -1020,6 +1053,7 @@ public class MaterializedViewAnalyzerTest {
TableConfig viewTableConfig = new TableConfigBuilder(TableType.OFFLINE)
.setTableName("mv_orders")
+ .setIsMaterializedView(true)
.setTimeColumnName("ts_ms")
.build();
Map<String, String> taskConfigs = buildTaskConfigs(sql);
@@ -1041,6 +1075,7 @@ public class MaterializedViewAnalyzerTest {
TableConfig viewTableConfig = new TableConfigBuilder(TableType.OFFLINE)
.setTableName("mv_orders")
+ .setIsMaterializedView(true)
.setTimeColumnName("day")
.build();
Map<String, String> taskConfigs = buildTaskConfigs(sql);
@@ -1057,6 +1092,7 @@ public class MaterializedViewAnalyzerTest {
private TableConfig buildMaterializedViewTableConfig() {
return new TableConfigBuilder(TableType.OFFLINE)
.setTableName("mv_orders")
+ .setIsMaterializedView(true)
.setTimeColumnName(TIME_COLUMN)
.build();
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index ea9ad0615a7..84d8f7978ff 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -196,6 +196,7 @@ public final class TableConfigUtils {
}
validateTaskConfig(tableConfig);
+ validateMaterializedViewInvariants(tableConfig);
if (_enforcePoolBasedAssignment) {
validateInstancePoolsAndReplicaGroups(tableConfig);
@@ -1300,6 +1301,7 @@ public final class TableConfigUtils {
List<String> violations = new ArrayList<>();
validateUpsertConfigUpdate(newConfig, existingConfig, violations);
validateDedupConfigUpdate(newConfig, existingConfig, violations);
+ validateMaterializedViewConfigUpdate(newConfig, existingConfig,
violations);
return violations;
}
@@ -1423,6 +1425,73 @@ public final class TableConfigUtils {
}
}
+ /**
+ * Validates materialized-view table identity and task-config consistency.
+ * Identity is declared only via {@link TableConfig#isMaterializedView()};
task configs alone do not
+ * make a table an MV.
+ */
+ @VisibleForTesting
+ static void validateMaterializedViewInvariants(TableConfig tableConfig) {
+ boolean isMaterializedView = tableConfig.isMaterializedView();
+ boolean hasMvTaskWithDefinedSql =
tableConfig.hasMaterializedViewTaskWithDefinedSql();
+ boolean hasMvTask = tableConfig.getMaterializedViewTaskConfigs() != null;
+
+ if (isMaterializedView) {
+ Preconditions.checkState(tableConfig.getTableType() == TableType.OFFLINE,
+ "Materialized view tables must be OFFLINE, got: %s for table: %s",
tableConfig.getTableType(),
+ tableConfig.getTableName());
+ Preconditions.checkState(hasMvTaskWithDefinedSql,
+ "isMaterializedView is true but MaterializedViewTask with non-empty
definedSQL is required for table: %s",
+ tableConfig.getTableName());
+ Preconditions.checkState(!tableConfig.isDimTable(),
+ "A table cannot be both isDimTable and isMaterializedView: %s",
tableConfig.getTableName());
+ }
+
+ if (hasMvTaskWithDefinedSql && !isMaterializedView) {
+ throw new IllegalStateException(String.format(
+ "MaterializedViewTask is configured but isMaterializedView is not
true for table: %s. "
+ + "Set \"isMaterializedView\": true or remove
MaterializedViewTask.",
+ tableConfig.getTableName()));
+ }
+
+ if (hasMvTask && !hasMvTaskWithDefinedSql) {
+ throw new IllegalStateException(String.format(
+ "MaterializedViewTask is configured but definedSQL is missing or
empty for table: %s",
+ tableConfig.getTableName()));
+ }
+ }
+
+ private static void validateMaterializedViewConfigUpdate(TableConfig
newConfig, TableConfig existingConfig,
+ List<String> violations) {
+ if (existingConfig.isMaterializedView() != newConfig.isMaterializedView())
{
+ violations.add(String.format("isMaterializedView (%s -> %s) cannot be
changed; drop and recreate the table",
+ existingConfig.isMaterializedView(),
newConfig.isMaterializedView()));
+ }
+
+ if (!existingConfig.isMaterializedView() &&
!newConfig.isMaterializedView()) {
+ return;
+ }
+
+ String existingDefinedSql =
getDefinedSqlFromMaterializedViewTask(existingConfig);
+ String newDefinedSql = getDefinedSqlFromMaterializedViewTask(newConfig);
+ if (existingDefinedSql != null && newDefinedSql != null &&
!existingDefinedSql.equals(newDefinedSql)) {
+ violations.add("MaterializedViewTask.definedSQL is immutable after MV
creation");
+ }
+
+ if (existingConfig.isMaterializedView() &&
!newConfig.hasMaterializedViewTaskWithDefinedSql()) {
+ violations.add("MaterializedViewTask with definedSQL cannot be removed
from a materialized view table");
+ }
+ }
+
+ @Nullable
+ private static String getDefinedSqlFromMaterializedViewTask(TableConfig
tableConfig) {
+ Map<String, String> configs = tableConfig.getMaterializedViewTaskConfigs();
+ if (configs == null) {
+ return null;
+ }
+ return
configs.get(org.apache.pinot.spi.utils.CommonConstants.MaterializedViewTask.DEFINED_SQL_KEY);
+ }
+
/**
* Validates task configuration to ensure no conflicting task types are
configured.
*/
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index e5f8e5d51ce..db06da4dabd 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -4261,4 +4261,78 @@ public class TableConfigUtilsTest {
List<String> violations =
TableConfigUtils.validateBackwardCompatibility(newConfig, existingConfig);
assertTrue(violations.isEmpty(), "Expected no violations for non-upsert
tables, but got: " + violations);
}
+
+ @Test
+ public void
testValidateMaterializedViewInvariantsFlagRequiresOfflineAndTask() {
+ TableConfig mvWithoutTask = new
TableConfigBuilder(TableType.OFFLINE).setTableName("mv_test")
+ .setIsMaterializedView(true).build();
+ assertThrows(IllegalStateException.class, () ->
TableConfigUtils.validateMaterializedViewInvariants(mvWithoutTask));
+
+ TableConfig mvRealtime = new
TableConfigBuilder(TableType.REALTIME).setTableName("mv_test")
+ .setIsMaterializedView(true)
+ .setTaskConfig(new
org.apache.pinot.spi.config.table.TableTaskConfig(buildMaterializedViewTaskMap("SELECT
1")))
+ .build();
+ assertThrows(IllegalStateException.class, () ->
TableConfigUtils.validateMaterializedViewInvariants(mvRealtime));
+ }
+
+ @Test
+ public void testValidateMaterializedViewInvariantsTaskWithoutFlagRejected() {
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("mv_test")
+ .setTaskConfig(new org.apache.pinot.spi.config.table.TableTaskConfig(
+ buildMaterializedViewTaskMap("SELECT city FROM orders GROUP BY
city")))
+ .build();
+ assertThrows(IllegalStateException.class,
+ () ->
TableConfigUtils.validateMaterializedViewInvariants(tableConfig));
+ }
+
+ @Test
+ public void
testValidateMaterializedViewInvariantsTaskWithoutDefinedSqlRejected() {
+ Map<String, Map<String, String>> taskTypeConfigsMap = new HashMap<>();
+
taskTypeConfigsMap.put(org.apache.pinot.spi.utils.CommonConstants.MaterializedViewTask.TASK_TYPE,
+ Map.of("bucketTimePeriod", "1d"));
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("mv_test")
+ .setTaskConfig(new
org.apache.pinot.spi.config.table.TableTaskConfig(taskTypeConfigsMap))
+ .build();
+ assertThrows(IllegalStateException.class,
+ () ->
TableConfigUtils.validateMaterializedViewInvariants(tableConfig));
+ }
+
+ @Test
+ public void
testValidateBackwardCompatibilityMaterializedViewDefinedSqlImmutable() {
+ String sql = "SELECT city FROM orders GROUP BY city";
+ TableConfig existingConfig = buildMaterializedViewTableConfig(sql);
+ TableConfig newConfig = buildMaterializedViewTableConfig("SELECT city,
count(*) FROM orders GROUP BY city");
+
+ List<String> violations =
TableConfigUtils.validateBackwardCompatibility(newConfig, existingConfig);
+ assertEquals(violations.size(), 1);
+ assertTrue(violations.get(0).contains("definedSQL is immutable"));
+ }
+
+ @Test
+ public void
testValidateBackwardCompatibilityMaterializedViewFlagCannotChange() {
+ String sql = "SELECT city FROM orders GROUP BY city";
+ TableConfig existingConfig = buildMaterializedViewTableConfig(sql);
+ TableConfig newConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("mv_test")
+ .setTaskConfig(new
org.apache.pinot.spi.config.table.TableTaskConfig(buildMaterializedViewTaskMap(sql)))
+ .build();
+
+ List<String> violations =
TableConfigUtils.validateBackwardCompatibility(newConfig, existingConfig);
+ assertEquals(violations.size(), 1);
+ assertTrue(violations.get(0).contains("isMaterializedView"));
+ }
+
+ private static Map<String, Map<String, String>>
buildMaterializedViewTaskMap(String definedSql) {
+ Map<String, String> mvTaskConfig = new HashMap<>();
+
mvTaskConfig.put(org.apache.pinot.spi.utils.CommonConstants.MaterializedViewTask.DEFINED_SQL_KEY,
definedSql);
+ Map<String, Map<String, String>> taskTypeConfigsMap = new HashMap<>();
+
taskTypeConfigsMap.put(org.apache.pinot.spi.utils.CommonConstants.MaterializedViewTask.TASK_TYPE,
mvTaskConfig);
+ return taskTypeConfigsMap;
+ }
+
+ private static TableConfig buildMaterializedViewTableConfig(String
definedSql) {
+ return new TableConfigBuilder(TableType.OFFLINE).setTableName("mv_test")
+ .setIsMaterializedView(true)
+ .setTaskConfig(new
org.apache.pinot.spi.config.table.TableTaskConfig(buildMaterializedViewTaskMap(definedSql)))
+ .build();
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index 7d214bf8d67..122e1784400 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -37,6 +37,7 @@ import
org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.table.assignment.SegmentAssignmentConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -45,6 +46,7 @@ public class TableConfig extends BaseJsonConfig {
public static final String TABLE_NAME_KEY = "tableName";
public static final String TABLE_TYPE_KEY = "tableType";
public static final String IS_DIM_TABLE_KEY = "isDimTable";
+ public static final String IS_MATERIALIZED_VIEW_KEY = "isMaterializedView";
public static final String VALIDATION_CONFIG_KEY = "segmentsConfig";
public static final String TENANT_CONFIG_KEY = "tenants";
public static final String INDEXING_CONFIG_KEY = "tableIndexConfig";
@@ -82,6 +84,9 @@ public class TableConfig extends BaseJsonConfig {
@JsonPropertyDescription("Indicates whether the table is a dimension table
or not")
private final boolean _dimTable;
+ @JsonPropertyDescription("Indicates whether the table is a materialized view
or not")
+ private final boolean _materializedView;
+
private SegmentsValidationAndRetentionConfig _validationConfig;
private TenantConfig _tenantConfig;
private IndexingConfig _indexingConfig;
@@ -134,6 +139,30 @@ public class TableConfig extends BaseJsonConfig {
@JsonPropertyDescription(value = "Configs for table samplers")
private List<TableSamplerConfig> _tableSamplers;
+ /// Legacy constructor preserved for binary backward-compatibility on the
public SPI surface.
+ /// Callers compiled against the pre-`isMaterializedView` signature still
link against this entry
+ /// point; it forwards to the canonical constructor with
`materializedView=false`. Prefer the
+ /// {@link TableConfigBuilder} or the canonical constructor below for new
code.
+ @Deprecated
+ public TableConfig(String tableName, String tableType,
+ SegmentsValidationAndRetentionConfig validationConfig, TenantConfig
tenantConfig,
+ IndexingConfig indexingConfig, TableCustomConfig customConfig, @Nullable
QuotaConfig quotaConfig,
+ @Nullable TableTaskConfig taskConfig, @Nullable RoutingConfig
routingConfig,
+ @Nullable QueryConfig queryConfig,
+ @Nullable Map<String, InstanceAssignmentConfig>
instanceAssignmentConfigMap,
+ @Nullable List<FieldConfig> fieldConfigList, @Nullable UpsertConfig
upsertConfig,
+ @Nullable DedupConfig dedupConfig, @Nullable DimensionTableConfig
dimensionTableConfig,
+ @Nullable IngestionConfig ingestionConfig, @Nullable List<TierConfig>
tierConfigsList, boolean dimTable,
+ @Nullable List<TunerConfig> tunerConfigList,
+ @Nullable Map<InstancePartitionsType, String> instancePartitionsMap,
+ @Nullable Map<String, SegmentAssignmentConfig>
segmentAssignmentConfigMap,
+ @Nullable List<TableSamplerConfig> tableSamplers) {
+ this(tableName, tableType, validationConfig, tenantConfig, indexingConfig,
customConfig, quotaConfig, taskConfig,
+ routingConfig, queryConfig, instanceAssignmentConfigMap,
fieldConfigList, upsertConfig, dedupConfig,
+ dimensionTableConfig, ingestionConfig, tierConfigsList, dimTable,
tunerConfigList, instancePartitionsMap,
+ segmentAssignmentConfigMap, tableSamplers, /*materializedView=*/
false);
+ }
+
@JsonCreator
public TableConfig(@JsonProperty(value = TABLE_NAME_KEY, required = true)
String tableName,
@JsonProperty(value = TABLE_TYPE_KEY, required = true) String tableType,
@@ -160,7 +189,8 @@ public class TableConfig extends BaseJsonConfig {
Map<InstancePartitionsType, String> instancePartitionsMap,
@JsonProperty(SEGMENT_ASSIGNMENT_CONFIG_MAP_KEY) @Nullable
Map<String, SegmentAssignmentConfig> segmentAssignmentConfigMap,
- @JsonProperty(TABLE_SAMPLERS_KEY) @Nullable List<TableSamplerConfig>
tableSamplers) {
+ @JsonProperty(TABLE_SAMPLERS_KEY) @Nullable List<TableSamplerConfig>
tableSamplers,
+ @JsonProperty(IS_MATERIALIZED_VIEW_KEY) boolean materializedView) {
Preconditions.checkArgument(tableName != null, "'tableName' must be
configured");
Preconditions.checkArgument(!tableName.contains(TABLE_NAME_FORBIDDEN_SUBSTRING),
"'tableName' cannot contain double underscore ('__')");
@@ -188,6 +218,7 @@ public class TableConfig extends BaseJsonConfig {
_ingestionConfig = ingestionConfig;
_tierConfigsList = tierConfigsList;
_dimTable = dimTable;
+ _materializedView = materializedView;
_tunerConfigList = tunerConfigList;
_instancePartitionsMap = instancePartitionsMap;
_segmentAssignmentConfigMap = segmentAssignmentConfigMap;
@@ -213,6 +244,7 @@ public class TableConfig extends BaseJsonConfig {
_ingestionConfig = tableConfig.getIngestionConfig();
_tierConfigsList = tableConfig.getTierConfigsList();
_dimTable = tableConfig.isDimTable();
+ _materializedView = tableConfig.isMaterializedView();
_tunerConfigList = tableConfig.getTunerConfigsList();
_instancePartitionsMap = tableConfig.getInstancePartitionsMap();
_segmentAssignmentConfigMap = tableConfig.getSegmentAssignmentConfigMap();
@@ -261,6 +293,32 @@ public class TableConfig extends BaseJsonConfig {
return _dimTable;
}
+ @JsonProperty(IS_MATERIALIZED_VIEW_KEY)
+ public boolean isMaterializedView() {
+ return _materializedView;
+ }
+
+ /// Returns task configs for
[CommonConstants.MaterializedViewTask#TASK_TYPE], or null if absent.
+ @JsonIgnore
+ @Nullable
+ public Map<String, String> getMaterializedViewTaskConfigs() {
+ if (_taskConfig == null) {
+ return null;
+ }
+ return
_taskConfig.getConfigsForTaskType(CommonConstants.MaterializedViewTask.TASK_TYPE);
+ }
+
+ /// Whether the table has a non-empty `definedSQL` under
[CommonConstants.MaterializedViewTask#TASK_TYPE].
+ @JsonIgnore
+ public boolean hasMaterializedViewTaskWithDefinedSql() {
+ Map<String, String> configs = getMaterializedViewTaskConfigs();
+ if (configs == null) {
+ return false;
+ }
+ String definedSql =
configs.get(CommonConstants.MaterializedViewTask.DEFINED_SQL_KEY);
+ return definedSql != null && !definedSql.trim().isEmpty();
+ }
+
@JsonProperty(VALIDATION_CONFIG_KEY)
public SegmentsValidationAndRetentionConfig getValidationConfig() {
return _validationConfig;
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index 3b316d33739..38db0db9025 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -66,6 +66,7 @@ public class TableConfigBuilder {
private final TableType _tableType;
private String _tableName;
private boolean _isDimTable;
+ private boolean _isMaterializedView;
// Segments config related
private String _numReplicas = DEFAULT_NUM_REPLICAS;
@@ -161,6 +162,11 @@ public class TableConfigBuilder {
return this;
}
+ public TableConfigBuilder setIsMaterializedView(boolean isMaterializedView) {
+ _isMaterializedView = isMaterializedView;
+ return this;
+ }
+
public TableConfigBuilder addFieldConfig(FieldConfig config) {
if (_fieldConfigList == null) {
_fieldConfigList = new ArrayList<>();
@@ -568,7 +574,8 @@ public class TableConfigBuilder {
new TableConfig(_tableName, _tableType.toString(), validationConfig,
tenantConfig, indexingConfig,
_customConfig, _quotaConfig, _taskConfig, _routingConfig,
_queryConfig, _instanceAssignmentConfigMap,
_fieldConfigList, _upsertConfig, _dedupConfig,
_dimensionTableConfig, _ingestionConfig, _tierConfigList,
- _isDimTable, _tunerConfigList, _instancePartitionsMap,
_segmentAssignmentConfigMap, _tableSamplers);
+ _isDimTable, _tunerConfigList, _instancePartitionsMap,
_segmentAssignmentConfigMap,
+ _tableSamplers, _isMaterializedView);
tableConfig.setDescription(_description);
tableConfig.setTags(_tags);
return tableConfig;
diff --git
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/PropertyMapping.java
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/PropertyMapping.java
index bc32fd4967c..a1673942d40 100644
---
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/PropertyMapping.java
+++
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/PropertyMapping.java
@@ -300,6 +300,9 @@ public final class PropertyMapping {
case "isdimtable":
builder.setIsDimTable(parseBool(lowerKey, value));
return true;
+ case "ismaterializedview":
+ builder.setIsMaterializedView(parseBool(lowerKey, value));
+ return true;
case "invertedindexcolumns":
builder.setInvertedIndexColumns(splitCsv(value));
return true;
diff --git
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/reverse/PropertyExtractor.java
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/reverse/PropertyExtractor.java
index f33767850a4..b80335e4aad 100644
---
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/reverse/PropertyExtractor.java
+++
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/reverse/PropertyExtractor.java
@@ -332,6 +332,9 @@ public final class PropertyExtractor {
if (config.isDimTable()) {
props.put("isDimTable", "true");
}
+ if (config.isMaterializedView()) {
+ props.put("isMaterializedView", "true");
+ }
putIfPresent(props, "description", config.getDescription());
List<String> tags = config.getTags();
if (tags != null && !tags.isEmpty()) {
diff --git
a/pinot-tools/src/main/resources/examples/batch/airlineStatsMv/airlineStatsMv_offline_table_config.json
b/pinot-tools/src/main/resources/examples/batch/airlineStatsMv/airlineStatsMv_offline_table_config.json
index 26682fe8903..02c3728e9cc 100644
---
a/pinot-tools/src/main/resources/examples/batch/airlineStatsMv/airlineStatsMv_offline_table_config.json
+++
b/pinot-tools/src/main/resources/examples/batch/airlineStatsMv/airlineStatsMv_offline_table_config.json
@@ -1,6 +1,7 @@
{
"tableName": "airlineStatsMv",
"tableType": "OFFLINE",
+ "isMaterializedView": true,
"segmentsConfig": {
"timeColumnName": "tsMs",
"timeType": "MILLISECONDS",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]