This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4633cf58866 Add checks to not update upsert / dedup configs after 
table creation (#17645)
4633cf58866 is described below

commit 4633cf58866901a29887ec7004e912f377d12d60
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Wed Mar 4 12:36:23 2026 -0800

    Add checks to not update upsert / dedup configs after table creation 
(#17645)
---
 .../TableConfigBackwardIncompatibleException.java  |  26 ++++
 .../api/resources/PinotTableRestletResource.java   |  12 +-
 .../api/resources/TableConfigsRestletResource.java |  13 +-
 .../helix/core/PinotHelixResourceManager.java      |  77 ++++++++++-
 .../RealtimeOffsetAutoResetKafkaHandler.java       |  14 +-
 .../helix/LogicalTableMetadataCacheTest.java       |   2 +-
 .../segment/local/utils/TableConfigUtils.java      | 143 +++++++++++++++++++++
 .../java/org/apache/pinot/spi/data/Schema.java     |  12 ++
 8 files changed, 280 insertions(+), 19 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/exception/TableConfigBackwardIncompatibleException.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/exception/TableConfigBackwardIncompatibleException.java
new file mode 100644
index 00000000000..21125a3b0f7
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/exception/TableConfigBackwardIncompatibleException.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.exception;
+
+public class TableConfigBackwardIncompatibleException extends Exception {
+
+  public TableConfigBackwardIncompatibleException(String message) {
+    super(message);
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 9d934c4d184..498f0349beb 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -79,6 +79,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.exception.RebalanceInProgressException;
 import org.apache.pinot.common.exception.SchemaNotFoundException;
+import 
org.apache.pinot.common.exception.TableConfigBackwardIncompatibleException;
 import org.apache.pinot.common.exception.TableNotFoundException;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.ControllerMeter;
@@ -727,7 +728,10 @@ public class PinotTableRestletResource {
   public ConfigSuccessResponse updateTableConfig(
       @ApiParam(value = "Name of the table to update", required = true) 
@PathParam("tableName") String tableName,
       @ApiParam(value = "comma separated list of validation type(s) to skip. 
supported types: (ALL|TASK|UPSERT)")
-      @QueryParam("validationTypesToSkip") @Nullable String typesToSkip, 
@Context HttpHeaders headers,
+      @QueryParam("validationTypesToSkip") @Nullable String typesToSkip,
+      @ApiParam(value = "Force config changes")
+      @QueryParam("force") @DefaultValue("false") boolean force,
+      @Context HttpHeaders headers,
       String tableConfigString)
       throws Exception {
     Pair<TableConfig, Map<String, Object>> 
tableConfigAndUnrecognizedProperties;
@@ -762,7 +766,11 @@ public class PinotTableRestletResource {
         throw new ControllerApplicationException(LOGGER, "Table " + 
tableNameWithType + " does not exist",
             Response.Status.NOT_FOUND);
       }
-      _pinotHelixResourceManager.updateTableConfig(tableConfig);
+      _pinotHelixResourceManager.updateTableConfig(tableConfig, force);
+    } catch (TableConfigBackwardIncompatibleException e) {
+      String errStr = String.format("Failed to update configuration for %s due 
to: %s", tableName, e.getMessage());
+      
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_UPDATE_ERROR,
 1L);
+      throw new ControllerApplicationException(LOGGER, errStr, 
Response.Status.BAD_REQUEST, e);
     } catch (InvalidTableConfigException e) {
       String errStr = String.format("Failed to update configuration for %s due 
to: %s", tableName, e.getMessage());
       
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_UPDATE_ERROR,
 1L);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
index 72bb48a1878..744d5a1dd22 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
@@ -49,6 +49,7 @@ import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.tuple.Pair;
+import 
org.apache.pinot.common.exception.TableConfigBackwardIncompatibleException;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -357,8 +358,8 @@ public class TableConfigsRestletResource {
   @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.UPDATE_TABLE_CONFIGS)
   @Authenticate(AccessType.UPDATE)
   @Produces(MediaType.APPLICATION_JSON)
-  @ApiOperation(value = "Update the TableConfigs provided by the 
tableConfigsStr json",
-      notes = "Update the TableConfigs provided by the tableConfigsStr json")
+  @ApiOperation(value = "Update the TableConfigs provided by the 
tableConfigsStr json", notes = "Update the "
+      + "TableConfigs provided by the tableConfigsStr json")
   public ConfigSuccessResponse updateConfig(
       @ApiParam(value = "TableConfigs name i.e. raw table name", required = 
true) @PathParam("tableName")
       String tableName,
@@ -405,7 +406,7 @@ public class TableConfigsRestletResource {
       if (offlineTableConfig != null) {
         tuneConfig(offlineTableConfig, schema);
         if (_pinotHelixResourceManager.hasOfflineTable(tableName)) {
-          _pinotHelixResourceManager.updateTableConfig(offlineTableConfig);
+          _pinotHelixResourceManager.updateTableConfig(offlineTableConfig, 
forceTableSchemaUpdate);
           LOGGER.info("Updated offline table config: {}", tableName);
         } else {
           _pinotHelixResourceManager.addTable(offlineTableConfig);
@@ -415,13 +416,17 @@ public class TableConfigsRestletResource {
       if (realtimeTableConfig != null) {
         tuneConfig(realtimeTableConfig, schema);
         if (_pinotHelixResourceManager.hasRealtimeTable(tableName)) {
-          _pinotHelixResourceManager.updateTableConfig(realtimeTableConfig);
+          _pinotHelixResourceManager.updateTableConfig(realtimeTableConfig, 
forceTableSchemaUpdate);
           LOGGER.info("Updated realtime table config: {}", tableName);
         } else {
           _pinotHelixResourceManager.addTable(realtimeTableConfig);
           LOGGER.info("Created realtime table config: {}", tableName);
         }
       }
+    } catch (TableConfigBackwardIncompatibleException e) {
+      
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_UPDATE_ERROR,
 1L);
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Invalid TableConfigs for: %s, %s", tableName, 
e.getMessage()), Response.Status.BAD_REQUEST, e);
     } catch (InvalidTableConfigException e) {
       
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_TABLE_UPDATE_ERROR,
 1L);
       throw new ControllerApplicationException(LOGGER,
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index d0f204087b0..8ae187ab3a6 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -43,6 +43,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
@@ -95,6 +96,7 @@ import 
org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.exception.SchemaAlreadyExistsException;
 import org.apache.pinot.common.exception.SchemaBackwardIncompatibleException;
 import org.apache.pinot.common.exception.SchemaNotFoundException;
+import 
org.apache.pinot.common.exception.TableConfigBackwardIncompatibleException;
 import org.apache.pinot.common.exception.TableNotFoundException;
 import org.apache.pinot.common.lineage.LineageEntry;
 import org.apache.pinot.common.lineage.LineageEntryState;
@@ -155,6 +157,7 @@ import 
org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
 import org.apache.pinot.controller.workload.QueryWorkloadManager;
 import org.apache.pinot.core.util.NumberUtils;
 import org.apache.pinot.core.util.NumericException;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.config.DatabaseConfig;
 import org.apache.pinot.spi.config.instance.Instance;
@@ -1571,6 +1574,7 @@ public class PinotHelixResourceManager {
       LOGGER.info("New schema: {} is the same as the existing schema, not 
updating it", schemaName);
       return;
     }
+
     boolean isBackwardCompatible = schema.isBackwardCompatibleWith(oldSchema);
     if (!isBackwardCompatible) {
       if (forceTableSchemaUpdate) {
@@ -1582,6 +1586,17 @@ public class PinotHelixResourceManager {
             .append(" is not backward-compatible with the existing schema.");
         errorMsg.append("\n\nIncompatibility Details:");
 
+        // Check for primary key column changes
+        // Allow adding primary keys if not present. Helps add upsert and 
dedup configs to existing tables.
+        List<String> oldPrimaryKeys = oldSchema.getPrimaryKeyColumns();
+        List<String> newPrimaryKeys = schema.getPrimaryKeyColumns();
+        if (CollectionUtils.isNotEmpty(oldPrimaryKeys)) {
+          if (!Objects.equals(oldPrimaryKeys, newPrimaryKeys)) {
+            errorMsg.append("\n- Primary key columns changed 
(").append(oldPrimaryKeys).append(" -> ")
+                .append(newPrimaryKeys).append(")");
+          }
+        }
+
         // Check for missing columns
         Set<String> newSchemaColumns = schema.getColumnNames();
         List<String> missingColumns = new ArrayList<>();
@@ -1621,9 +1636,10 @@ public class PinotHelixResourceManager {
         errorMsg.append("\n\nSuggestions to fix:");
         errorMsg.append("\n1. Ensure all columns from the existing schema are 
retained in the new schema");
         errorMsg.append("\n2. Do not change the data type or field type of 
existing columns");
-        errorMsg.append("\n3. New columns should be added as optional fields 
with default values");
-        errorMsg.append("\n4. If you must make breaking changes, consider 
creating a new schema version or use "
-            + "forceTableSchemaUpdate=true (use with caution)");
+        errorMsg.append("\n3. Do not change primary key columns");
+        errorMsg.append("\n4. New columns should be added as optional fields 
with default values");
+        errorMsg.append("\n5. If you must make breaking changes, consider 
creating a new schema version or use "
+            + "force=true (use with caution)");
 
         throw new SchemaBackwardIncompatibleException(errorMsg.toString());
       }
@@ -2145,12 +2161,25 @@ public class PinotHelixResourceManager {
   /**
    * Validate the table config and update it
    * @throws IOException
+   * @throws TableConfigBackwardIncompatibleException if config changes are 
backward incompatible
    */
   public void updateTableConfig(TableConfig tableConfig)
-      throws IOException {
+      throws IOException, TableConfigBackwardIncompatibleException {
+    updateTableConfig(tableConfig, false);
+  }
+
+  /**
+   * Validate the table config and update it
+   * @param tableConfig the table config to update
+   * @param force if true, allows upsert/dedup config changes with a warning
+   * @throws IOException
+   * @throws TableConfigBackwardIncompatibleException if config changes are 
backward incompatible and force is false
+   */
+  public void updateTableConfig(TableConfig tableConfig, boolean force)
+      throws IOException, TableConfigBackwardIncompatibleException {
     validateTableTenantConfig(tableConfig);
     validateTableTaskMinionInstanceTagConfig(tableConfig);
-    setExistingTableConfig(tableConfig);
+    setExistingTableConfig(tableConfig, -1, force);
   }
 
   /**
@@ -2158,7 +2187,7 @@ public class PinotHelixResourceManager {
    * TODO - Make this private and always use updateTableConfig ?
    */
   public void setExistingTableConfig(TableConfig tableConfig)
-      throws IOException {
+      throws IOException, TableConfigBackwardIncompatibleException {
     setExistingTableConfig(tableConfig, -1);
   }
 
@@ -2259,9 +2288,43 @@ public class PinotHelixResourceManager {
   /**
    * Sets the given table config into zookeeper with the expected version, 
which is the previous tableConfig znRecord
    * version. If the expected version is -1, the version check is ignored.
+   *
+   * @throws TableConfigBackwardIncompatibleException if config changes are 
backward incompatible
+   */
+  public void setExistingTableConfig(TableConfig tableConfig, int 
expectedVersion)
+      throws TableConfigBackwardIncompatibleException {
+    setExistingTableConfig(tableConfig, expectedVersion, false);
+  }
+
+  /**
+   * Sets the given table config into zookeeper with the expected version.
+   *
+   * @param tableConfig the table config to set
+   * @param expectedVersion the expected version (-1 to ignore version check)
+   * @param force if true, allows upsert/dedup config changes with a warning
+   * @throws TableConfigBackwardIncompatibleException if config changes are 
backward incompatible and force is false
    */
-  public void setExistingTableConfig(TableConfig tableConfig, int 
expectedVersion) {
+  public void setExistingTableConfig(TableConfig tableConfig, int 
expectedVersion, boolean force)
+      throws TableConfigBackwardIncompatibleException {
     String tableNameWithType = tableConfig.getTableName();
+    TableConfig existingTableConfig = getTableConfig(tableNameWithType);
+    if (existingTableConfig != null) {
+      List<String> violations = 
TableConfigUtils.validateBackwardCompatibility(tableConfig, 
existingTableConfig);
+      if (!violations.isEmpty()) {
+        String tableName = tableConfig.getTableName();
+        if (force) {
+          LOGGER.warn("Forcing a config update for table: {} with violations: 
{}."
+              + "This may cause data inconsistencies or data loss. Be cautious 
during compactions, and "
+              + "pause consumption beforehand and disable SNAPSHOT mode in 
upsertConfig and restart for the changes"
+              + " to kick in. If in doubt, recreate the table with the new 
configuration.", tableName, violations);
+        } else {
+          throw new TableConfigBackwardIncompatibleException(String.format(
+              "Failed to update table '%s': Cannot modify %s as it may lead to 
data inconsistencies. "
+                  + "Please create a new table instead.", tableName, 
violations));
+        }
+      }
+    }
+
     if (!ZKMetadataProvider.setTableConfig(_propertyStore, tableConfig, 
expectedVersion)) {
       throw new RuntimeException(
           "Failed to update table config in Zookeeper for table: " + 
tableNameWithType + " with" + " expected version: "
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/RealtimeOffsetAutoResetKafkaHandler.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/RealtimeOffsetAutoResetKafkaHandler.java
index 363455b0190..c17d9c6152e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/RealtimeOffsetAutoResetKafkaHandler.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/RealtimeOffsetAutoResetKafkaHandler.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import 
org.apache.pinot.common.exception.TableConfigBackwardIncompatibleException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -70,12 +71,12 @@ public abstract class RealtimeOffsetAutoResetKafkaHandler 
implements RealtimeOff
    * @return true if successfully started the backfill job and its ingestion
    */
   @Override
-  public boolean triggerBackfillJob(
-      String tableNameWithType, StreamConfig streamConfig, String topicName, 
int partitionId, long fromOffset,
-      long toOffset) {
+  public boolean triggerBackfillJob(String tableNameWithType, StreamConfig 
streamConfig, String topicName,
+      int partitionId, long fromOffset, long toOffset) {
     // Trigger the data replication and get the new topic's stream config.
-    Map<String, String> newTopicStreamConfig = 
triggerDataReplicationAndGetTopicInfo(
-        tableNameWithType, streamConfig, topicName, partitionId, fromOffset, 
toOffset);
+    Map<String, String> newTopicStreamConfig =
+        triggerDataReplicationAndGetTopicInfo(tableNameWithType, streamConfig, 
topicName, partitionId, fromOffset,
+            toOffset);
     if (newTopicStreamConfig == null) {
       return false;
     }
@@ -88,6 +89,9 @@ public abstract class RealtimeOffsetAutoResetKafkaHandler 
implements RealtimeOff
     } catch (IOException e) {
       LOGGER.error("Cannot add backfill topic to the table config", e);
       return false;
+    } catch (TableConfigBackwardIncompatibleException e) {
+      LOGGER.error("Cannot change backfill job to the table config", e);
+      return false;
     }
     return true;
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/LogicalTableMetadataCacheTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/LogicalTableMetadataCacheTest.java
index 6b8434d3ed3..2fcc5a35a54 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/LogicalTableMetadataCacheTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/LogicalTableMetadataCacheTest.java
@@ -105,7 +105,7 @@ public class LogicalTableMetadataCacheTest {
 
   @Test
   public void testLogicalTableCacheWithUpdates()
-      throws IOException {
+      throws Exception {
     String logicalTableName = "testLogicalTable1";
     LogicalTableConfig logicalTableConfig = addLogicalTableAndValidateCache(
         logicalTableName, List.of(_offlineTableConfig.getTableName(), 
_realtimeTableConfig.getTableName()));
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index a8453a1c3bf..0947c5f6938 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -32,6 +32,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -1183,6 +1184,148 @@ public final class TableConfigUtils {
     }
   }
 
+  /**
+   * Validates backward compatibility for table config updates.
+   * Checks critical upsert and dedup configuration fields that should not be 
changed.
+   *
+   * @param newConfig the new table config being applied
+   * @param existingConfig the existing table config
+   * @return list of violations (empty if no violations)
+   */
+  public static List<String> validateBackwardCompatibility(TableConfig 
newConfig, TableConfig existingConfig) {
+    List<String> violations = new ArrayList<>();
+    validateUpsertConfigUpdate(newConfig, existingConfig, violations);
+    validateDedupConfigUpdate(newConfig, existingConfig, violations);
+
+    return violations;
+  }
+
+  /**
+   * Validates that critical upsert configuration fields are not changed 
during table config update.
+   * Checks: mode, hashFunction, comparisonColumns, timeColumn (when no 
comparison columns),
+   * deleteRecordColumn, dropOutOfOrderRecord, outOfOrderRecordColumn,
+   * partialUpsertStrategies, defaultPartialUpsertStrategy.
+   *
+   * @param newConfig the new table config being applied
+   * @param existingConfig the existing table config
+   * @param violations list to collect violation messages
+   */
+  private static void validateUpsertConfigUpdate(TableConfig newConfig, 
TableConfig existingConfig,
+      List<String> violations) {
+    boolean existingUpsertEnabled = existingConfig.isUpsertEnabled();
+    boolean newUpsertEnabled = newConfig.isUpsertEnabled();
+
+    // Check if upsert is being added or removed
+    if (existingUpsertEnabled != newUpsertEnabled) {
+      if (existingUpsertEnabled) {
+        LOGGER.info("upsertConfig is removed from existing upsert table: {}", 
newConfig.getTableName());
+      } else {
+        LOGGER.info("upsertConfig is added to existing non-upsert table: {}", 
newConfig.getTableName());
+      }
+    } else if (existingUpsertEnabled) {
+      UpsertConfig existingUpsertConfig = existingConfig.getUpsertConfig();
+      UpsertConfig newUpsertConfig = newConfig.getUpsertConfig();
+
+      if (existingUpsertConfig.getMode() != newUpsertConfig.getMode()) {
+        violations.add(
+            String.format("upsertConfig.mode (%s -> %s)", 
existingUpsertConfig.getMode(), newUpsertConfig.getMode()));
+      }
+      if (existingUpsertConfig.getHashFunction() != 
newUpsertConfig.getHashFunction()) {
+        violations.add(String.format("upsertConfig.hashFunction (%s -> %s)", 
existingUpsertConfig.getHashFunction(),
+            newUpsertConfig.getHashFunction()));
+      }
+      if (!Objects.equals(existingUpsertConfig.getComparisonColumns(),
+          newUpsertConfig.getComparisonColumns())) {
+        violations.add(
+            String.format("upsertConfig.comparisonColumns (%s -> %s)", 
existingUpsertConfig.getComparisonColumns(),
+                newUpsertConfig.getComparisonColumns()));
+      }
+      List<String> existingComparisonColumns = 
existingUpsertConfig.getComparisonColumns();
+      if (existingComparisonColumns == null || 
existingComparisonColumns.isEmpty()) {
+        String existingTimeColumn =
+            existingConfig.getValidationConfig() != null ? 
existingConfig.getValidationConfig().getTimeColumnName()
+                : null;
+        String newTimeColumn =
+            newConfig.getValidationConfig() != null ? 
newConfig.getValidationConfig().getTimeColumnName() : null;
+        if (!Objects.equals(existingTimeColumn, newTimeColumn)) {
+          violations.add(
+              String.format("timeColumnName (%s -> %s) - used as default 
comparison column", existingTimeColumn,
+                  newTimeColumn));
+        }
+      }
+      if (existingUpsertConfig.isDropOutOfOrderRecord() != 
newUpsertConfig.isDropOutOfOrderRecord()) {
+        violations.add(
+            String.format("upsertConfig.dropOutOfOrderRecord (%s -> %s)", 
existingUpsertConfig.isDropOutOfOrderRecord(),
+                newUpsertConfig.isDropOutOfOrderRecord()));
+      }
+      if (!Objects.equals(existingUpsertConfig.getOutOfOrderRecordColumn(),
+          newUpsertConfig.getOutOfOrderRecordColumn())) {
+        violations.add(String.format("upsertConfig.outOfOrderRecordColumn (%s 
-> %s)",
+            existingUpsertConfig.getOutOfOrderRecordColumn(), 
newUpsertConfig.getOutOfOrderRecordColumn()));
+      }
+      if (existingUpsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) {
+        if (!Objects.equals(existingUpsertConfig.getPartialUpsertStrategies(),
+            newUpsertConfig.getPartialUpsertStrategies())) {
+          violations.add(String.format("upsertConfig.partialUpsertStrategies 
(%s -> %s)",
+              existingUpsertConfig.getPartialUpsertStrategies(), 
newUpsertConfig.getPartialUpsertStrategies()));
+        }
+        if (existingUpsertConfig.getDefaultPartialUpsertStrategy()
+            != newUpsertConfig.getDefaultPartialUpsertStrategy()) {
+          
violations.add(String.format("upsertConfig.defaultPartialUpsertStrategy (%s -> 
%s)",
+              existingUpsertConfig.getDefaultPartialUpsertStrategy(),
+              newUpsertConfig.getDefaultPartialUpsertStrategy()));
+        }
+      }
+    }
+  }
+
+  /**
+   * Validates that critical dedup configuration fields are not changed during 
table config update.
+   * Checks: dedupEnabled, hashFunction, dedupTimeColumn, timeColumnName (when 
dedupTimeColumn not specified).
+   *
+   * @param newConfig the new table config being applied
+   * @param existingConfig the existing table config
+   * @param violations list to collect violation messages
+   */
+  private static void validateDedupConfigUpdate(TableConfig newConfig, 
TableConfig existingConfig,
+      List<String> violations) {
+    boolean existingDedupEnabled = existingConfig.isDedupEnabled();
+    boolean newDedupEnabled = newConfig.isDedupEnabled();
+    if (existingDedupEnabled != newDedupEnabled) {
+      if (existingDedupEnabled) {
+        LOGGER.info("dedupConfig is removed from existing dedup table: {}", 
newConfig.getTableName());
+      } else {
+        LOGGER.info("dedupConfig is added into the existing non-dedup table: 
{}", newConfig.getTableName());
+      }
+    } else if (existingDedupEnabled) {
+      DedupConfig existingDedupConfig = existingConfig.getDedupConfig();
+      DedupConfig newDedupConfig = newConfig.getDedupConfig();
+
+      if (existingDedupConfig.getHashFunction() != 
newDedupConfig.getHashFunction()) {
+        violations.add(String.format("dedupConfig.hashFunction (%s -> %s)", 
existingDedupConfig.getHashFunction(),
+            newDedupConfig.getHashFunction()));
+      }
+
+      if (!Objects.equals(existingDedupConfig.getDedupTimeColumn(), 
newDedupConfig.getDedupTimeColumn())) {
+        violations.add(String.format("dedupConfig.dedupTimeColumn (%s -> %s)", 
existingDedupConfig.getDedupTimeColumn(),
+            newDedupConfig.getDedupTimeColumn()));
+      }
+      String existingDedupTimeColumn = 
existingDedupConfig.getDedupTimeColumn();
+      if (existingDedupTimeColumn == null || 
existingDedupTimeColumn.isEmpty()) {
+        String existingTimeColumn =
+            existingConfig.getValidationConfig() != null ? 
existingConfig.getValidationConfig().getTimeColumnName()
+                : null;
+        String newTimeColumn =
+            newConfig.getValidationConfig() != null ? 
newConfig.getValidationConfig().getTimeColumnName() : null;
+        if (!Objects.equals(existingTimeColumn, newTimeColumn)) {
+          violations.add(
+              String.format("timeColumnName (%s -> %s) - used as default dedup 
time column", existingTimeColumn,
+                  newTimeColumn));
+        }
+      }
+    }
+  }
+
   /**
    * Validates task configuration to ensure no conflicting task types are 
configured.
    */
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
index 57e2f9f27ca..a21961357c2 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
@@ -34,6 +34,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
@@ -41,6 +42,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.FieldSpec.FieldType;
@@ -840,10 +842,20 @@ public final class Schema implements Serializable {
    * Backward compatibility requires
    * (1) all columns in oldSchema should be retained.
    * (2) all column fieldSpecs should be backward compatible with the old ones.
+   * (3) primary key columns should not be changed if present(used in 
dimension tables, upsert, and dedup).
    *
    * @param oldSchema old schema
    */
   public boolean isBackwardCompatibleWith(Schema oldSchema) {
+    List<String> oldPrimaryKeys = oldSchema.getPrimaryKeyColumns();
+    List<String> newPrimaryKeys = getPrimaryKeyColumns();
+    // Allow adding primary keys if not present. Helps add upsert and dedup 
configs to existing tables.
+    if (CollectionUtils.isNotEmpty(oldPrimaryKeys)) {
+      if (!Objects.equals(oldPrimaryKeys, newPrimaryKeys)) {
+        return false;
+      }
+    }
+
     Set<String> columnNames = getColumnNames();
     for (Map.Entry<String, FieldSpec> entry : 
oldSchema.getFieldSpecMap().entrySet()) {
       String oldSchemaColumnName = entry.getKey();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to