This is an automated email from the ASF dual-hosted git repository.

xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6aae074def8 Add isMaterializedView flag on TableConfig as the single 
MV identity (#18564)
6aae074def8 is described below

commit 6aae074def84a46b5ce93821fc3313121b9c9126
Author: Hongkun Xu <[email protected]>
AuthorDate: Fri May 22 16:11:21 2026 +0800

    Add isMaterializedView flag on TableConfig as the single MV identity 
(#18564)
    
    Today, identifying a materialized view (MV) table requires scattered
    heuristics: scanning task configs for MaterializedViewTask, fetching
    the ZK definition znode, or parsing the definedSQL. This makes MV
    identity ambiguous on the create/update path and forces every caller
    that needs to ask "is this an MV?" to repeat the same multi-source
    inference, which is also subject to ordering issues at table-create
    time (the definition znode is created lazily by the scheduler on the
    first task run, not synchronously at table create).
    
    Introduce isMaterializedView on TableConfig as the single source of
    truth for MV identity, modeled on isDimTable:
    
      - pinot-spi: add IS_MATERIALIZED_VIEW_KEY, _materializedView field,
        constructor/copy parameter, getter, and JsonIgnore helpers
        getMaterializedViewTaskConfigs() /
        hasMaterializedViewTaskWithDefinedSql(). TableConfigBuilder gains
        setIsMaterializedView().
      - pinot-common: persist isMaterializedView in the ZN simple fields.
      - pinot-sql-ddl: PropertyMapping accepts ismaterializedview;
        PropertyExtractor emits it on reverse-render.
      - pinot-segment-local TableConfigUtils:
          * validateMaterializedViewInvariants: flag=true requires OFFLINE
            + MaterializedViewTask + non-empty definedSQL, and forbids
            coexistence with isDimTable; MaterializedViewTask without the
            flag (or without definedSQL) is rejected.
          * validateBackwardCompatibility: isMaterializedView cannot be
            toggled; definedSQL is immutable after MV creation;
            MaterializedViewTask cannot be removed from an MV table.
      - pinot-materialized-view:
          * MaterializedViewAnalyzer.analyze now requires the flag.
          * MaterializedViewTaskScheduler skips task generation when the
            flag is false.
      - pinot-controller PinotHelixResourceManager: both the table-create
        and table-drop MV-consistency notifications are now gated on
        tableConfig.isMaterializedView(); the drop path no longer falls
        back to task-config scan or CalciteSqlParser to infer MV identity.
      - airlineStatsMv example table config sets isMaterializedView: true.
    
    Tests:
      - MaterializedViewAnalyzerTest: every MV TableConfigBuilder now sets
        setIsMaterializedView(true); new test asserts analyze fails when
        the flag is missing.
      - TableConfigUtilsTest: new tests cover the four
        validateMaterializedViewInvariants branches and the
        definedSQL-immutable / flag-cannot-change backward-compatibility
        rules; existing positional TableConfig constructor call sites
        updated for the new parameter.
      - PinotDataWriter.scala (spark-3 connector) dummy TableConfig
        constructor updated for the new positional parameter.
    
    This is intentionally identity-only: the existing ZK Definition and
    Runtime znodes still drive the MV runtime view (watermark, partition
    state) and the /materializedViews REST listing endpoint; aligning
    those with the flag is left to a follow-up.
    
    Signed-off-by: Hongkun Xu <[email protected]>
    Co-authored-by: Xiang Fu <[email protected]>
    Co-authored-by: Cursor <[email protected]>
---
 .../common/utils/config/TableConfigSerDeUtils.java |  6 +-
 .../helix/core/PinotHelixResourceManager.java      | 46 +++++---------
 .../analysis/MaterializedViewAnalyzer.java         |  4 ++
 .../scheduler/MaterializedViewTaskScheduler.java   |  6 ++
 .../analysis/MaterializedViewAnalyzerTest.java     | 36 +++++++++++
 .../segment/local/utils/TableConfigUtils.java      | 69 ++++++++++++++++++++
 .../segment/local/utils/TableConfigUtilsTest.java  | 74 ++++++++++++++++++++++
 .../apache/pinot/spi/config/table/TableConfig.java | 60 +++++++++++++++++-
 .../spi/utils/builder/TableConfigBuilder.java      |  9 ++-
 .../pinot/sql/ddl/compile/PropertyMapping.java     |  3 +
 .../pinot/sql/ddl/reverse/PropertyExtractor.java   |  3 +
 .../airlineStatsMv_offline_table_config.json       |  1 +
 12 files changed, 285 insertions(+), 32 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtils.java
index a24583891fd..436e58f3ae5 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtils.java
@@ -66,6 +66,8 @@ public class TableConfigSerDeUtils {
 
     String tableType = simpleFields.get(TableConfig.TABLE_TYPE_KEY);
     boolean isDimTable = 
Boolean.parseBoolean(simpleFields.get(TableConfig.IS_DIM_TABLE_KEY));
+    boolean isMaterializedView =
+        
Boolean.parseBoolean(simpleFields.get(TableConfig.IS_MATERIALIZED_VIEW_KEY));
     Preconditions.checkState(tableType != null, 
FIELD_MISSING_MESSAGE_TEMPLATE, TableConfig.TABLE_TYPE_KEY);
 
     String validationConfigString = 
simpleFields.get(TableConfig.VALIDATION_CONFIG_KEY);
@@ -198,7 +200,8 @@ public class TableConfigSerDeUtils {
         new TableConfig(tableName, tableType, validationConfig, tenantConfig, 
indexingConfig, customConfig,
             quotaConfig, taskConfig, routingConfig, queryConfig, 
instanceAssignmentConfigMap, fieldConfigList,
             upsertConfig, dedupConfig, dimensionTableConfig, ingestionConfig, 
tierConfigList, isDimTable,
-            tunerConfigList, instancePartitionsMap, 
segmentAssignmentConfigMap, tableSamplerConfigs);
+            tunerConfigList, instancePartitionsMap, segmentAssignmentConfigMap,
+            tableSamplerConfigs, isMaterializedView);
     tableConfig.setDescription(description);
     tableConfig.setTags(tags);
     return tableConfig;
@@ -216,6 +219,7 @@ public class TableConfigSerDeUtils {
     simpleFields.put(TableConfig.INDEXING_CONFIG_KEY, 
tableConfig.getIndexingConfig().toJsonString());
     simpleFields.put(TableConfig.CUSTOM_CONFIG_KEY, 
tableConfig.getCustomConfig().toJsonString());
     simpleFields.put(TableConfig.IS_DIM_TABLE_KEY, 
Boolean.toString(tableConfig.isDimTable()));
+    simpleFields.put(TableConfig.IS_MATERIALIZED_VIEW_KEY, 
Boolean.toString(tableConfig.isMaterializedView()));
 
     // Optional fields
     QuotaConfig quotaConfig = tableConfig.getQuotaConfig();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index dc4d50e5a1c..d5cb2b1d97f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -163,6 +163,7 @@ import 
org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
 import org.apache.pinot.controller.workload.QueryWorkloadManager;
 import org.apache.pinot.core.util.NumberUtils;
 import org.apache.pinot.core.util.NumericException;
+import org.apache.pinot.materializedview.analysis.MaterializedViewAnalyzer;
 import 
org.apache.pinot.materializedview.consistency.MaterializedViewConsistencyManager;
 import 
org.apache.pinot.materializedview.metadata.MaterializedViewDefinitionMetadata;
 import 
org.apache.pinot.materializedview.metadata.MaterializedViewDefinitionMetadataUtils;
@@ -174,7 +175,6 @@ import org.apache.pinot.spi.config.instance.Instance;
 import org.apache.pinot.spi.config.instance.InstanceConfigValidatorRegistry;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableStatsHumanReadable;
-import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TagOverrideConfig;
 import org.apache.pinot.spi.config.table.TenantConfig;
@@ -205,7 +205,6 @@ import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
 import org.apache.pinot.spi.utils.retry.RetryPolicy;
-import org.apache.pinot.sql.parsers.CalciteSqlParser;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -5377,24 +5376,27 @@ public class PinotHelixResourceManager {
 
   private void 
notifyMaterializedViewConsistencyManagerForTableCreate(TableConfig tableConfig) 
{
     MaterializedViewConsistencyManager mgr = 
_materializedViewConsistencyManager;
-    if (mgr == null) {
+    if (mgr == null || !tableConfig.isMaterializedView()) {
       return;
     }
     try {
-      TableTaskConfig taskConfig = tableConfig.getTaskConfig();
-      if (taskConfig == null) {
+      MaterializedViewDefinitionMetadata definition =
+          MaterializedViewDefinitionMetadataUtils.fetch(_propertyStore, 
tableConfig.getTableName());
+      if (definition != null && definition.getBaseTables() != null && 
!definition.getBaseTables().isEmpty()) {
+        mgr.onMaterializedViewTableCreated(tableConfig.getTableName(), 
definition.getBaseTables());
         return;
       }
-      Map<String, String> materializedViewTaskConfigs =
-          
taskConfig.getConfigsForTaskType(CommonConstants.MaterializedViewTask.TASK_TYPE);
+      Map<String, String> materializedViewTaskConfigs = 
tableConfig.getMaterializedViewTaskConfigs();
       if (materializedViewTaskConfigs == null) {
+        LOGGER.warn("MV table {} has no MaterializedViewTask config for 
consistency registration",
+            tableConfig.getTableName());
         return;
       }
       String definedSQL = 
materializedViewTaskConfigs.get(CommonConstants.MaterializedViewTask.DEFINED_SQL_KEY);
       if (definedSQL == null || definedSQL.isEmpty()) {
         return;
       }
-      String sourceTable = 
CalciteSqlParser.compileToPinotQuery(definedSQL).getDataSource().getTableName();
+      String sourceTable = 
MaterializedViewAnalyzer.extractSourceTableName(definedSQL);
       mgr.onMaterializedViewTableCreated(tableConfig.getTableName(), 
Collections.singletonList(sourceTable));
     } catch (Exception e) {
       LOGGER.warn("Failed to register MV table {} with consistency manager", 
tableConfig.getTableName(), e);
@@ -5407,33 +5409,19 @@ public class PinotHelixResourceManager {
       return;
     }
     try {
+      TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+      if (tableConfig != null && !tableConfig.isMaterializedView()) {
+        return;
+      }
       MaterializedViewDefinitionMetadata materializedViewDefinition =
           MaterializedViewDefinitionMetadataUtils.fetch(_propertyStore, 
tableNameWithType);
       if (materializedViewDefinition != null && 
materializedViewDefinition.getBaseTables() != null
           && !materializedViewDefinition.getBaseTables().isEmpty()) {
         mgr.onMaterializedViewTableDropped(tableNameWithType, 
materializedViewDefinition.getBaseTables());
-        return;
-      }
-      // Fall back to reading source table from table task config
-      TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
-      if (tableConfig == null) {
-        return;
-      }
-      TableTaskConfig taskConfig = tableConfig.getTaskConfig();
-      if (taskConfig == null) {
-        return;
-      }
-      Map<String, String> materializedViewTaskConfigs =
-          
taskConfig.getConfigsForTaskType(CommonConstants.MaterializedViewTask.TASK_TYPE);
-      if (materializedViewTaskConfigs == null) {
-        return;
-      }
-      String definedSQL = 
materializedViewTaskConfigs.get(CommonConstants.MaterializedViewTask.DEFINED_SQL_KEY);
-      if (definedSQL == null || definedSQL.isEmpty()) {
-        return;
+      } else if (tableConfig != null && tableConfig.isMaterializedView()) {
+        LOGGER.warn("MV table {} dropped without definition metadata; 
consistency reverse index may be stale",
+            tableNameWithType);
       }
-      String sourceTable = 
CalciteSqlParser.compileToPinotQuery(definedSQL).getDataSource().getTableName();
-      mgr.onMaterializedViewTableDropped(tableNameWithType, 
Collections.singletonList(sourceTable));
     } catch (Exception e) {
       LOGGER.warn("Failed to unregister MV table {} from consistency manager", 
tableNameWithType, e);
     }
diff --git 
a/pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/analysis/MaterializedViewAnalyzer.java
 
b/pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/analysis/MaterializedViewAnalyzer.java
index 9bae3d479ff..118276b92f9 100644
--- 
a/pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/analysis/MaterializedViewAnalyzer.java
+++ 
b/pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/analysis/MaterializedViewAnalyzer.java
@@ -102,6 +102,10 @@ public final class MaterializedViewAnalyzer {
   public static AnalysisResult analyze(String definedSql, TableConfig 
viewTableConfig, Schema viewSchema,
       Map<String, String> taskConfigs, MaterializedViewTaskGeneratorContext 
context) {
 
+    Preconditions.checkState(viewTableConfig.isMaterializedView(),
+        "MaterializedViewAnalyzer requires isMaterializedView=true on table: 
%s",
+        viewTableConfig.getTableName());
+
     // Step 4 first: cheap config checks that don't require parsing
     long bucketMs = validateTaskConfigs(viewTableConfig, taskConfigs, context);
 
diff --git 
a/pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/scheduler/MaterializedViewTaskScheduler.java
 
b/pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/scheduler/MaterializedViewTaskScheduler.java
index 86708f9f6b2..1f874617ea9 100644
--- 
a/pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/scheduler/MaterializedViewTaskScheduler.java
+++ 
b/pinot-materialized-view/src/main/java/org/apache/pinot/materializedview/scheduler/MaterializedViewTaskScheduler.java
@@ -129,6 +129,12 @@ public class MaterializedViewTaskScheduler {
     for (TableConfig tableConfig : tableConfigs) {
       String offlineTableName = tableConfig.getTableName();
 
+      if (!tableConfig.isMaterializedView()) {
+        LOGGER.warn("Skip generating task: {} for table: {} because 
isMaterializedView is not set",
+            taskType, offlineTableName);
+        continue;
+      }
+
       if (tableConfig.getTableType() != TableType.OFFLINE) {
         LOGGER.warn("Skip generating task: {} for non-OFFLINE table: {}", 
taskType, offlineTableName);
         continue;
diff --git 
a/pinot-materialized-view/src/test/java/org/apache/pinot/materializedview/analysis/MaterializedViewAnalyzerTest.java
 
b/pinot-materialized-view/src/test/java/org/apache/pinot/materializedview/analysis/MaterializedViewAnalyzerTest.java
index dd76834650b..f5ab621e90e 100644
--- 
a/pinot-materialized-view/src/test/java/org/apache/pinot/materializedview/analysis/MaterializedViewAnalyzerTest.java
+++ 
b/pinot-materialized-view/src/test/java/org/apache/pinot/materializedview/analysis/MaterializedViewAnalyzerTest.java
@@ -165,6 +165,7 @@ public class MaterializedViewAnalyzerTest {
     // point to the SELECT alias 'dayBucket' — not the inherited base name.
     TableConfig viewTableConfig = new TableConfigBuilder(TableType.OFFLINE)
         .setTableName("mv_orders")
+        .setIsMaterializedView(true)
         .setTimeColumnName("dayBucket")
         .build();
     Map<String, String> taskConfigs = buildTaskConfigs(sql);
@@ -653,6 +654,30 @@ public class MaterializedViewAnalyzerTest {
   //  Step 4: Task config parameter validation
   // -----------------------------------------------------------------------
 
+  @Test
+  public void testAnalyzeRequiresIsMaterializedViewFlag() {
+    String sql = "SELECT DaysSinceEpoch, city, count(*) AS cnt FROM orders 
GROUP BY DaysSinceEpoch, city";
+    Schema viewSchema = new Schema.SchemaBuilder()
+        .addSingleValueDimension("city", FieldSpec.DataType.STRING)
+        .addMetric("cnt", FieldSpec.DataType.LONG)
+        .addDateTime(TIME_COLUMN, FieldSpec.DataType.TIMESTAMP, 
"1:MILLISECONDS:TIMESTAMP", "1:MILLISECONDS")
+        .build();
+
+    TableConfig viewTableConfig = new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName("mv_orders")
+        .setTimeColumnName(TIME_COLUMN)
+        .build();
+    Map<String, String> taskConfigs = buildTaskConfigs(sql);
+
+    try {
+      MaterializedViewAnalyzer.analyze(withLimit(sql), viewTableConfig, 
viewSchema, taskConfigs, _mockAccessor);
+      fail("Expected IllegalStateException when isMaterializedView is false");
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains("isMaterializedView=true"),
+          "Unexpected message: " + e.getMessage());
+    }
+  }
+
   @Test
   public void testNonOfflineTableType() {
     String sql = "SELECT DaysSinceEpoch, city, count(*) AS cnt FROM orders 
GROUP BY DaysSinceEpoch, city";
@@ -664,6 +689,7 @@ public class MaterializedViewAnalyzerTest {
 
     TableConfig realtimeConfig = new TableConfigBuilder(TableType.REALTIME)
         .setTableName("mv_orders")
+        .setIsMaterializedView(true)
         .setTimeColumnName(TIME_COLUMN)
         .build();
     Map<String, String> taskConfigs = buildTaskConfigs(sql);
@@ -800,6 +826,7 @@ public class MaterializedViewAnalyzerTest {
 
     TableConfig viewTableConfig = new TableConfigBuilder(TableType.OFFLINE)
         .setTableName("mv_orders")
+        .setIsMaterializedView(true)
         .build();
     Map<String, String> taskConfigs = buildTaskConfigs(sql);
 
@@ -824,6 +851,7 @@ public class MaterializedViewAnalyzerTest {
     // timeColumnName points to a column that doesn't exist in the MV schema 
at all.
     TableConfig viewTableConfig = new TableConfigBuilder(TableType.OFFLINE)
         .setTableName("mv_orders")
+        .setIsMaterializedView(true)
         .setTimeColumnName("nonexistent_time_col")
         .build();
     Map<String, String> taskConfigs = buildTaskConfigs(sql);
@@ -849,6 +877,7 @@ public class MaterializedViewAnalyzerTest {
     // timeColumnName points to a plain dimension, not a registered dateTime 
column.
     TableConfig viewTableConfig = new TableConfigBuilder(TableType.OFFLINE)
         .setTableName("mv_orders")
+        .setIsMaterializedView(true)
         .setTimeColumnName("city")
         .build();
     Map<String, String> taskConfigs = buildTaskConfigs(sql);
@@ -878,6 +907,7 @@ public class MaterializedViewAnalyzerTest {
 
     TableConfig viewTableConfig = new TableConfigBuilder(TableType.OFFLINE)
         .setTableName("mv_orders")
+        .setIsMaterializedView(true)
         .setTimeColumnName(TIME_COLUMN)
         .build();
     Map<String, String> taskConfigs = buildTaskConfigs(sql);
@@ -907,6 +937,7 @@ public class MaterializedViewAnalyzerTest {
 
     TableConfig viewTableConfig = new TableConfigBuilder(TableType.OFFLINE)
         .setTableName("mv_orders")
+        .setIsMaterializedView(true)
         .setTimeColumnName("day")
         .build();
     Map<String, String> taskConfigs = buildTaskConfigs(sql);
@@ -978,6 +1009,7 @@ public class MaterializedViewAnalyzerTest {
 
     TableConfig viewTableConfig = new TableConfigBuilder(TableType.OFFLINE)
         .setTableName("mv_orders")
+        .setIsMaterializedView(true)
         .setTimeColumnName("day")
         .build();
     Map<String, String> taskConfigs = buildTaskConfigs(sql);
@@ -1000,6 +1032,7 @@ public class MaterializedViewAnalyzerTest {
 
     TableConfig viewTableConfig = new TableConfigBuilder(TableType.OFFLINE)
         .setTableName("mv_orders")
+        .setIsMaterializedView(true)
         .setTimeColumnName("hr")
         .build();
     Map<String, String> taskConfigs = buildTaskConfigs(sql);
@@ -1020,6 +1053,7 @@ public class MaterializedViewAnalyzerTest {
 
     TableConfig viewTableConfig = new TableConfigBuilder(TableType.OFFLINE)
         .setTableName("mv_orders")
+        .setIsMaterializedView(true)
         .setTimeColumnName("ts_ms")
         .build();
     Map<String, String> taskConfigs = buildTaskConfigs(sql);
@@ -1041,6 +1075,7 @@ public class MaterializedViewAnalyzerTest {
 
     TableConfig viewTableConfig = new TableConfigBuilder(TableType.OFFLINE)
         .setTableName("mv_orders")
+        .setIsMaterializedView(true)
         .setTimeColumnName("day")
         .build();
     Map<String, String> taskConfigs = buildTaskConfigs(sql);
@@ -1057,6 +1092,7 @@ public class MaterializedViewAnalyzerTest {
   private TableConfig buildMaterializedViewTableConfig() {
     return new TableConfigBuilder(TableType.OFFLINE)
         .setTableName("mv_orders")
+        .setIsMaterializedView(true)
         .setTimeColumnName(TIME_COLUMN)
         .build();
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index ea9ad0615a7..84d8f7978ff 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -196,6 +196,7 @@ public final class TableConfigUtils {
     }
 
     validateTaskConfig(tableConfig);
+    validateMaterializedViewInvariants(tableConfig);
 
     if (_enforcePoolBasedAssignment) {
       validateInstancePoolsAndReplicaGroups(tableConfig);
@@ -1300,6 +1301,7 @@ public final class TableConfigUtils {
     List<String> violations = new ArrayList<>();
     validateUpsertConfigUpdate(newConfig, existingConfig, violations);
     validateDedupConfigUpdate(newConfig, existingConfig, violations);
+    validateMaterializedViewConfigUpdate(newConfig, existingConfig, 
violations);
 
     return violations;
   }
@@ -1423,6 +1425,73 @@ public final class TableConfigUtils {
     }
   }
 
+  /**
+   * Validates materialized-view table identity and task-config consistency.
+   * Identity is declared only via {@link TableConfig#isMaterializedView()}; 
task configs alone do not
+   * make a table an MV.
+   */
+  @VisibleForTesting
+  static void validateMaterializedViewInvariants(TableConfig tableConfig) {
+    boolean isMaterializedView = tableConfig.isMaterializedView();
+    boolean hasMvTaskWithDefinedSql = 
tableConfig.hasMaterializedViewTaskWithDefinedSql();
+    boolean hasMvTask = tableConfig.getMaterializedViewTaskConfigs() != null;
+
+    if (isMaterializedView) {
+      Preconditions.checkState(tableConfig.getTableType() == TableType.OFFLINE,
+          "Materialized view tables must be OFFLINE, got: %s for table: %s", 
tableConfig.getTableType(),
+          tableConfig.getTableName());
+      Preconditions.checkState(hasMvTaskWithDefinedSql,
+          "isMaterializedView is true but MaterializedViewTask with non-empty 
definedSQL is required for table: %s",
+          tableConfig.getTableName());
+      Preconditions.checkState(!tableConfig.isDimTable(),
+          "A table cannot be both isDimTable and isMaterializedView: %s", 
tableConfig.getTableName());
+    }
+
+    if (hasMvTaskWithDefinedSql && !isMaterializedView) {
+      throw new IllegalStateException(String.format(
+          "MaterializedViewTask is configured but isMaterializedView is not 
true for table: %s. "
+              + "Set \"isMaterializedView\": true or remove 
MaterializedViewTask.",
+          tableConfig.getTableName()));
+    }
+
+    if (hasMvTask && !hasMvTaskWithDefinedSql) {
+      throw new IllegalStateException(String.format(
+          "MaterializedViewTask is configured but definedSQL is missing or 
empty for table: %s",
+          tableConfig.getTableName()));
+    }
+  }
+
+  private static void validateMaterializedViewConfigUpdate(TableConfig 
newConfig, TableConfig existingConfig,
+      List<String> violations) {
+    if (existingConfig.isMaterializedView() != newConfig.isMaterializedView()) 
{
+      violations.add(String.format("isMaterializedView (%s -> %s) cannot be 
changed; drop and recreate the table",
+          existingConfig.isMaterializedView(), 
newConfig.isMaterializedView()));
+    }
+
+    if (!existingConfig.isMaterializedView() && 
!newConfig.isMaterializedView()) {
+      return;
+    }
+
+    String existingDefinedSql = 
getDefinedSqlFromMaterializedViewTask(existingConfig);
+    String newDefinedSql = getDefinedSqlFromMaterializedViewTask(newConfig);
+    if (existingDefinedSql != null && newDefinedSql != null && 
!existingDefinedSql.equals(newDefinedSql)) {
+      violations.add("MaterializedViewTask.definedSQL is immutable after MV 
creation");
+    }
+
+    if (existingConfig.isMaterializedView() && 
!newConfig.hasMaterializedViewTaskWithDefinedSql()) {
+      violations.add("MaterializedViewTask with definedSQL cannot be removed 
from a materialized view table");
+    }
+  }
+
+  @Nullable
+  private static String getDefinedSqlFromMaterializedViewTask(TableConfig 
tableConfig) {
+    Map<String, String> configs = tableConfig.getMaterializedViewTaskConfigs();
+    if (configs == null) {
+      return null;
+    }
+    return 
configs.get(org.apache.pinot.spi.utils.CommonConstants.MaterializedViewTask.DEFINED_SQL_KEY);
+  }
+
   /**
    * Validates task configuration to ensure no conflicting task types are 
configured.
    */
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index e5f8e5d51ce..db06da4dabd 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -4261,4 +4261,78 @@ public class TableConfigUtilsTest {
     List<String> violations = 
TableConfigUtils.validateBackwardCompatibility(newConfig, existingConfig);
     assertTrue(violations.isEmpty(), "Expected no violations for non-upsert 
tables, but got: " + violations);
   }
+
+  @Test
+  public void 
testValidateMaterializedViewInvariantsFlagRequiresOfflineAndTask() {
+    TableConfig mvWithoutTask = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("mv_test")
+        .setIsMaterializedView(true).build();
+    assertThrows(IllegalStateException.class, () -> 
TableConfigUtils.validateMaterializedViewInvariants(mvWithoutTask));
+
+    TableConfig mvRealtime = new 
TableConfigBuilder(TableType.REALTIME).setTableName("mv_test")
+        .setIsMaterializedView(true)
+        .setTaskConfig(new 
org.apache.pinot.spi.config.table.TableTaskConfig(buildMaterializedViewTaskMap("SELECT
 1")))
+        .build();
+    assertThrows(IllegalStateException.class, () -> 
TableConfigUtils.validateMaterializedViewInvariants(mvRealtime));
+  }
+
+  @Test
+  public void testValidateMaterializedViewInvariantsTaskWithoutFlagRejected() {
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("mv_test")
+        .setTaskConfig(new org.apache.pinot.spi.config.table.TableTaskConfig(
+            buildMaterializedViewTaskMap("SELECT city FROM orders GROUP BY 
city")))
+        .build();
+    assertThrows(IllegalStateException.class,
+        () -> 
TableConfigUtils.validateMaterializedViewInvariants(tableConfig));
+  }
+
+  @Test
+  public void 
testValidateMaterializedViewInvariantsTaskWithoutDefinedSqlRejected() {
+    Map<String, Map<String, String>> taskTypeConfigsMap = new HashMap<>();
+    
taskTypeConfigsMap.put(org.apache.pinot.spi.utils.CommonConstants.MaterializedViewTask.TASK_TYPE,
+        Map.of("bucketTimePeriod", "1d"));
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("mv_test")
+        .setTaskConfig(new 
org.apache.pinot.spi.config.table.TableTaskConfig(taskTypeConfigsMap))
+        .build();
+    assertThrows(IllegalStateException.class,
+        () -> 
TableConfigUtils.validateMaterializedViewInvariants(tableConfig));
+  }
+
+  @Test
+  public void 
testValidateBackwardCompatibilityMaterializedViewDefinedSqlImmutable() {
+    String sql = "SELECT city FROM orders GROUP BY city";
+    TableConfig existingConfig = buildMaterializedViewTableConfig(sql);
+    TableConfig newConfig = buildMaterializedViewTableConfig("SELECT city, 
count(*) FROM orders GROUP BY city");
+
+    List<String> violations = 
TableConfigUtils.validateBackwardCompatibility(newConfig, existingConfig);
+    assertEquals(violations.size(), 1);
+    assertTrue(violations.get(0).contains("definedSQL is immutable"));
+  }
+
+  @Test
+  public void 
testValidateBackwardCompatibilityMaterializedViewFlagCannotChange() {
+    String sql = "SELECT city FROM orders GROUP BY city";
+    TableConfig existingConfig = buildMaterializedViewTableConfig(sql);
+    TableConfig newConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("mv_test")
+        .setTaskConfig(new 
org.apache.pinot.spi.config.table.TableTaskConfig(buildMaterializedViewTaskMap(sql)))
+        .build();
+
+    List<String> violations = 
TableConfigUtils.validateBackwardCompatibility(newConfig, existingConfig);
+    assertEquals(violations.size(), 1);
+    assertTrue(violations.get(0).contains("isMaterializedView"));
+  }
+
+  private static Map<String, Map<String, String>> 
buildMaterializedViewTaskMap(String definedSql) {
+    Map<String, String> mvTaskConfig = new HashMap<>();
+    
mvTaskConfig.put(org.apache.pinot.spi.utils.CommonConstants.MaterializedViewTask.DEFINED_SQL_KEY,
 definedSql);
+    Map<String, Map<String, String>> taskTypeConfigsMap = new HashMap<>();
+    
taskTypeConfigsMap.put(org.apache.pinot.spi.utils.CommonConstants.MaterializedViewTask.TASK_TYPE,
 mvTaskConfig);
+    return taskTypeConfigsMap;
+  }
+
+  private static TableConfig buildMaterializedViewTableConfig(String 
definedSql) {
+    return new TableConfigBuilder(TableType.OFFLINE).setTableName("mv_test")
+        .setIsMaterializedView(true)
+        .setTaskConfig(new 
org.apache.pinot.spi.config.table.TableTaskConfig(buildMaterializedViewTaskMap(definedSql)))
+        .build();
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index 7d214bf8d67..122e1784400 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -37,6 +37,7 @@ import 
org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.config.table.assignment.SegmentAssignmentConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.config.table.sampler.TableSamplerConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 
@@ -45,6 +46,7 @@ public class TableConfig extends BaseJsonConfig {
   public static final String TABLE_NAME_KEY = "tableName";
   public static final String TABLE_TYPE_KEY = "tableType";
   public static final String IS_DIM_TABLE_KEY = "isDimTable";
+  public static final String IS_MATERIALIZED_VIEW_KEY = "isMaterializedView";
   public static final String VALIDATION_CONFIG_KEY = "segmentsConfig";
   public static final String TENANT_CONFIG_KEY = "tenants";
   public static final String INDEXING_CONFIG_KEY = "tableIndexConfig";
@@ -82,6 +84,9 @@ public class TableConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Indicates whether the table is a dimension table 
or not")
   private final boolean _dimTable;
 
+  @JsonPropertyDescription("Indicates whether the table is a materialized view 
or not")
+  private final boolean _materializedView;
+
   private SegmentsValidationAndRetentionConfig _validationConfig;
   private TenantConfig _tenantConfig;
   private IndexingConfig _indexingConfig;
@@ -134,6 +139,30 @@ public class TableConfig extends BaseJsonConfig {
   @JsonPropertyDescription(value = "Configs for table samplers")
   private List<TableSamplerConfig> _tableSamplers;
 
+  /// Legacy constructor preserved for binary backward-compatibility on the 
public SPI surface.
+  /// Callers compiled against the pre-`isMaterializedView` signature still 
link against this entry
+  /// point; it forwards to the canonical constructor with 
`materializedView=false`. Prefer the
+  /// {@link TableConfigBuilder} or the canonical constructor below for new 
code.
+  @Deprecated
+  public TableConfig(String tableName, String tableType,
+      SegmentsValidationAndRetentionConfig validationConfig, TenantConfig 
tenantConfig,
+      IndexingConfig indexingConfig, TableCustomConfig customConfig, @Nullable 
QuotaConfig quotaConfig,
+      @Nullable TableTaskConfig taskConfig, @Nullable RoutingConfig 
routingConfig,
+      @Nullable QueryConfig queryConfig,
+      @Nullable Map<String, InstanceAssignmentConfig> 
instanceAssignmentConfigMap,
+      @Nullable List<FieldConfig> fieldConfigList, @Nullable UpsertConfig 
upsertConfig,
+      @Nullable DedupConfig dedupConfig, @Nullable DimensionTableConfig 
dimensionTableConfig,
+      @Nullable IngestionConfig ingestionConfig, @Nullable List<TierConfig> 
tierConfigsList, boolean dimTable,
+      @Nullable List<TunerConfig> tunerConfigList,
+      @Nullable Map<InstancePartitionsType, String> instancePartitionsMap,
+      @Nullable Map<String, SegmentAssignmentConfig> 
segmentAssignmentConfigMap,
+      @Nullable List<TableSamplerConfig> tableSamplers) {
+    this(tableName, tableType, validationConfig, tenantConfig, indexingConfig, 
customConfig, quotaConfig, taskConfig,
+        routingConfig, queryConfig, instanceAssignmentConfigMap, 
fieldConfigList, upsertConfig, dedupConfig,
+        dimensionTableConfig, ingestionConfig, tierConfigsList, dimTable, 
tunerConfigList, instancePartitionsMap,
+        segmentAssignmentConfigMap, tableSamplers, /*materializedView=*/ 
false);
+  }
+
   @JsonCreator
   public TableConfig(@JsonProperty(value = TABLE_NAME_KEY, required = true) 
String tableName,
       @JsonProperty(value = TABLE_TYPE_KEY, required = true) String tableType,
@@ -160,7 +189,8 @@ public class TableConfig extends BaseJsonConfig {
       Map<InstancePartitionsType, String> instancePartitionsMap,
       @JsonProperty(SEGMENT_ASSIGNMENT_CONFIG_MAP_KEY) @Nullable
       Map<String, SegmentAssignmentConfig> segmentAssignmentConfigMap,
-      @JsonProperty(TABLE_SAMPLERS_KEY) @Nullable List<TableSamplerConfig> 
tableSamplers) {
+      @JsonProperty(TABLE_SAMPLERS_KEY) @Nullable List<TableSamplerConfig> 
tableSamplers,
+      @JsonProperty(IS_MATERIALIZED_VIEW_KEY) boolean materializedView) {
     Preconditions.checkArgument(tableName != null, "'tableName' must be 
configured");
     
Preconditions.checkArgument(!tableName.contains(TABLE_NAME_FORBIDDEN_SUBSTRING),
         "'tableName' cannot contain double underscore ('__')");
@@ -188,6 +218,7 @@ public class TableConfig extends BaseJsonConfig {
     _ingestionConfig = ingestionConfig;
     _tierConfigsList = tierConfigsList;
     _dimTable = dimTable;
+    _materializedView = materializedView;
     _tunerConfigList = tunerConfigList;
     _instancePartitionsMap = instancePartitionsMap;
     _segmentAssignmentConfigMap = segmentAssignmentConfigMap;
@@ -213,6 +244,7 @@ public class TableConfig extends BaseJsonConfig {
     _ingestionConfig = tableConfig.getIngestionConfig();
     _tierConfigsList = tableConfig.getTierConfigsList();
     _dimTable = tableConfig.isDimTable();
+    _materializedView = tableConfig.isMaterializedView();
     _tunerConfigList = tableConfig.getTunerConfigsList();
     _instancePartitionsMap = tableConfig.getInstancePartitionsMap();
     _segmentAssignmentConfigMap = tableConfig.getSegmentAssignmentConfigMap();
@@ -261,6 +293,32 @@ public class TableConfig extends BaseJsonConfig {
     return _dimTable;
   }
 
+  @JsonProperty(IS_MATERIALIZED_VIEW_KEY)
+  public boolean isMaterializedView() {
+    return _materializedView;
+  }
+
+  /// Returns task configs for 
[CommonConstants.MaterializedViewTask#TASK_TYPE], or null if absent.
+  @JsonIgnore
+  @Nullable
+  public Map<String, String> getMaterializedViewTaskConfigs() {
+    if (_taskConfig == null) {
+      return null;
+    }
+    return 
_taskConfig.getConfigsForTaskType(CommonConstants.MaterializedViewTask.TASK_TYPE);
+  }
+
+  /// Whether the table has a non-empty `definedSQL` under 
[CommonConstants.MaterializedViewTask#TASK_TYPE].
+  @JsonIgnore
+  public boolean hasMaterializedViewTaskWithDefinedSql() {
+    Map<String, String> configs = getMaterializedViewTaskConfigs();
+    if (configs == null) {
+      return false;
+    }
+    String definedSql = 
configs.get(CommonConstants.MaterializedViewTask.DEFINED_SQL_KEY);
+    return definedSql != null && !definedSql.trim().isEmpty();
+  }
+
   @JsonProperty(VALIDATION_CONFIG_KEY)
   public SegmentsValidationAndRetentionConfig getValidationConfig() {
     return _validationConfig;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index 3b316d33739..38db0db9025 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -66,6 +66,7 @@ public class TableConfigBuilder {
   private final TableType _tableType;
   private String _tableName;
   private boolean _isDimTable;
+  private boolean _isMaterializedView;
 
   // Segments config related
   private String _numReplicas = DEFAULT_NUM_REPLICAS;
@@ -161,6 +162,11 @@ public class TableConfigBuilder {
     return this;
   }
 
+  public TableConfigBuilder setIsMaterializedView(boolean isMaterializedView) {
+    _isMaterializedView = isMaterializedView;
+    return this;
+  }
+
   public TableConfigBuilder addFieldConfig(FieldConfig config) {
     if (_fieldConfigList == null) {
       _fieldConfigList = new ArrayList<>();
@@ -568,7 +574,8 @@ public class TableConfigBuilder {
         new TableConfig(_tableName, _tableType.toString(), validationConfig, 
tenantConfig, indexingConfig,
             _customConfig, _quotaConfig, _taskConfig, _routingConfig, 
_queryConfig, _instanceAssignmentConfigMap,
             _fieldConfigList, _upsertConfig, _dedupConfig, 
_dimensionTableConfig, _ingestionConfig, _tierConfigList,
-            _isDimTable, _tunerConfigList, _instancePartitionsMap, 
_segmentAssignmentConfigMap, _tableSamplers);
+            _isDimTable, _tunerConfigList, _instancePartitionsMap, 
_segmentAssignmentConfigMap,
+            _tableSamplers, _isMaterializedView);
     tableConfig.setDescription(_description);
     tableConfig.setTags(_tags);
     return tableConfig;
diff --git 
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/PropertyMapping.java
 
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/PropertyMapping.java
index bc32fd4967c..a1673942d40 100644
--- 
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/PropertyMapping.java
+++ 
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/PropertyMapping.java
@@ -300,6 +300,9 @@ public final class PropertyMapping {
       case "isdimtable":
         builder.setIsDimTable(parseBool(lowerKey, value));
         return true;
+      case "ismaterializedview":
+        builder.setIsMaterializedView(parseBool(lowerKey, value));
+        return true;
       case "invertedindexcolumns":
         builder.setInvertedIndexColumns(splitCsv(value));
         return true;
diff --git 
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/reverse/PropertyExtractor.java
 
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/reverse/PropertyExtractor.java
index f33767850a4..b80335e4aad 100644
--- 
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/reverse/PropertyExtractor.java
+++ 
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/reverse/PropertyExtractor.java
@@ -332,6 +332,9 @@ public final class PropertyExtractor {
     if (config.isDimTable()) {
       props.put("isDimTable", "true");
     }
+    if (config.isMaterializedView()) {
+      props.put("isMaterializedView", "true");
+    }
     putIfPresent(props, "description", config.getDescription());
     List<String> tags = config.getTags();
     if (tags != null && !tags.isEmpty()) {
diff --git 
a/pinot-tools/src/main/resources/examples/batch/airlineStatsMv/airlineStatsMv_offline_table_config.json
 
b/pinot-tools/src/main/resources/examples/batch/airlineStatsMv/airlineStatsMv_offline_table_config.json
index 26682fe8903..02c3728e9cc 100644
--- 
a/pinot-tools/src/main/resources/examples/batch/airlineStatsMv/airlineStatsMv_offline_table_config.json
+++ 
b/pinot-tools/src/main/resources/examples/batch/airlineStatsMv/airlineStatsMv_offline_table_config.json
@@ -1,6 +1,7 @@
 {
   "tableName": "airlineStatsMv",
   "tableType": "OFFLINE",
+  "isMaterializedView": true,
   "segmentsConfig": {
     "timeColumnName": "tsMs",
     "timeType": "MILLISECONDS",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to