This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d54ec21ad56 Add check to not enable dropOutOfOrderRecord or 
outOfOrderColumn when consistencyMode != NONE (#17550)
d54ec21ad56 is described below

commit d54ec21ad56a7885cb66ff3dc6d05ddcae2dd06b
Author: Chaitanya Deepthi <[email protected]>
AuthorDate: Thu Jan 22 20:20:19 2026 -0800

    Add check to not enable dropOutOfOrderRecord or outOfOrderColumn when 
consistencyMode != NONE (#17550)
---
 .../realtime/RealtimeSegmentDataManager.java       |   4 +-
 .../indexsegment/mutable/MutableSegmentImpl.java   |   5 +
 .../upsert/BasePartitionUpsertMetadataManager.java |  19 ++--
 .../pinot/segment/local/upsert/UpsertContext.java  |  22 +++-
 .../segment/local/utils/TableConfigUtils.java      |  12 +++
 .../segment/local/utils/TableConfigUtilsTest.java  | 115 +++++++++++++++++++++
 6 files changed, 165 insertions(+), 12 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 788c1b14bc5..5f4fdc22138 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1520,8 +1520,8 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
             if (_realtimeTableDataManager.isUpsertEnabled()
                 && _realtimeTableDataManager.getTableUpsertMetadataManager() 
!= null) {
               UpsertContext context = 
_realtimeTableDataManager.getTableUpsertMetadataManager().getContext();
-              if (_realtimeTableDataManager.isPartialUpsertEnabled() || 
(context.isDropOutOfOrderRecord()
-                  && context.getConsistencyMode() == 
UpsertConfig.ConsistencyMode.NONE)) {
+              if (_realtimeTableDataManager.isPartialUpsertEnabled() || 
context.isDropOutOfOrderRecord()
+                  || context.getOutOfOrderRecordColumn() != null) {
                 _serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.REALTIME_ROWS_AHEAD_OF_ZK, 1L);
               }
             }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index c13a17ec430..2e8d9bb4bf5 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -640,6 +640,11 @@ public class MutableSegmentImpl implements MutableSegment {
     if (isUpsertEnabled()) {
       RecordInfo recordInfo = getRecordInfo(row, numDocsIndexed);
       GenericRow updatedRow = 
_partitionUpsertMetadataManager.updateRecord(row, recordInfo);
+      // NOTE: out-of-order records can not be dropped or marked when 
consistent upsert view is enabled.
+      // Since Indexing the record and updation of _numDocsIndexed counter 
happens before updating the upsert
+      // metadata, we wouldn't be able to actually drop or mark those records 
as dropped. This order is important for
+      // consistent upsert view, otherwise the latest doc can be missed by 
query due to 'docId < _numDocs' check
+      // in query filter operators. Here the record becomes queryable before 
validDocIds bitmaps are updated.
       if (_upsertConsistencyMode != UpsertConfig.ConsistencyMode.NONE) {
         updateDictionary(updatedRow);
         addNewRow(numDocsIndexed, updatedRow);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 1c64c6ee3b9..ab414dd6ede 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -675,15 +675,16 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       MutableRoaringBitmap validDocIdsForOldSegment, String segmentName) {
     int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
     boolean isConsumingSegmentSeal = !(oldSegment instanceof ImmutableSegment);
-    // For partial-upsert table and upsert table with dropOutOfOrder=true & 
consistencyMode = NONE, we do not store
-    // the previous record location when removing the primary keys not 
replaced, it can potentially cause inconsistency
-    // between replicas. This can happen when a consuming segment is replaced 
by a committed segment that is consumed
-    // from a different server with different records (some stream consumer 
cannot guarantee consuming the messages in
-    // the same order/ when a segment is replaced with lesser consumed rows 
from the other server).
-    if (isConsumingSegmentSeal && _context.isDropOutOfOrderRecord()
-        && _context.getConsistencyMode() == UpsertConfig.ConsistencyMode.NONE) 
{
+    // For partial-upsert tables and upsert tables with 
dropOutOfOrderRecord=true or outOfOrderRecordColumn configured,
+    // we do not store previous record locations and instead remove all 
primary keys that are not replaced. This can
+    // lead to inconsistencies across replicas when a consuming segment is 
replaced by a committed segment generated
+    // on a different server with a different set of records. Such scenarios 
can occur when stream consumers do not
+    // guarantee the same consumption order, or when a segment with fewer 
consumed rows replaces another segment.
+    // To prevent these inconsistencies, we persist the previous record 
locations so that we can revert to them and
+    // restore consistency across replicas.
+    if (isConsumingSegmentSeal && (_context.isDropOutOfOrderRecord() || 
_context.getOutOfOrderRecordColumn() != null)) {
       _logger.warn("Found {} primary keys not replaced when sealing consuming 
segment: {} for upsert table with "
-              + "dropOutOfOrderRecord enabled with no consistency mode. This 
can potentially cause inconsistency "
+              + "dropOutOfOrderRecord or outOfOrderRecordColumn enabled. This 
can potentially cause inconsistency "
               + "between replicas. Reverting back metadata changes and 
triggering segment replacement.",
           numKeysNotReplaced,
           segmentName);
@@ -735,7 +736,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
                 + "Proceeding with current state which may cause 
inconsistency.", MAX_UPSERT_REVERT_RETRIES,
             segmentName,
             numKeysStillNotReplaced);
-        if (_context.isDropOutOfOrderRecord() && _context.getConsistencyMode() 
== UpsertConfig.ConsistencyMode.NONE) {
+        if (_context.isDropOutOfOrderRecord() || 
_context.getOutOfOrderRecordColumn() != null) {
           _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.REALTIME_UPSERT_INCONSISTENT_ROWS,
               numKeysStillNotReplaced);
         } else if (_partialUpsertHandler != null) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
index 41aace4c040..d576e325979 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java
@@ -30,9 +30,12 @@ import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class UpsertContext {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(UpsertContext.class);
   private final TableConfig _tableConfig;
   private final Schema _schema;
   private final List<String> _primaryKeyColumns;
@@ -64,7 +67,6 @@ public class UpsertContext {
   @Nullable
   private final TableDataManager _tableDataManager;
   private final File _tableIndexDir;
-
   private UpsertContext(TableConfig tableConfig, Schema schema, List<String> 
primaryKeyColumns,
       HashFunction hashFunction, List<String> comparisonColumns, @Nullable 
PartialUpsertHandler partialUpsertHandler,
       @Nullable String deleteRecordColumn, boolean dropOutOfOrderRecord, 
@Nullable String outOfOrderRecordColumn,
@@ -223,6 +225,7 @@ public class UpsertContext {
     private PartialUpsertHandler _partialUpsertHandler;
     private String _deleteRecordColumn;
     private boolean _dropOutOfOrderRecord;
+    @Nullable
     private String _outOfOrderRecordColumn;
     private boolean _enableSnapshot;
     private boolean _enablePreload;
@@ -355,6 +358,23 @@ public class UpsertContext {
         Preconditions.checkState(_tableDataManager != null, "Either table data 
manager or table index dir must be set");
         _tableIndexDir = _tableDataManager.getTableDataDir();
       }
+      // dropOutOfOrderRecord and outOfOrderRecordColumn are not supported 
when consistencyMode is SYNC or SNAPSHOT.
+      // In these modes, records are indexed before metadata is updated, so we 
can't drop or mark out-of-order records.
+      // Disable them silently for backward compatibility with existing tables.
+      if (_consistencyMode != UpsertConfig.ConsistencyMode.NONE) {
+        if (_dropOutOfOrderRecord) {
+          _dropOutOfOrderRecord = false;
+          LOGGER.warn(
+              "dropOutOfOrderRecord is not supported when consistencyMode is 
set, disabling dropOutOfOrderRecord to get"
+                  + "consistent mode: {} for the table: {}", _consistencyMode, 
_tableConfig.getTableName());
+        }
+        if (_outOfOrderRecordColumn != null) {
+          _outOfOrderRecordColumn = null;
+          LOGGER.warn(
+              "outOfOrderRecordColumn is not supported when consistencyMode is 
set, removing outOfOrderRecordColumn "
+                  + "to get consistent mode: {} for the table: {}", 
_consistencyMode, _tableConfig.getTableName());
+        }
+      }
       return new UpsertContext(_tableConfig, _schema, _primaryKeyColumns, 
_hashFunction, _comparisonColumns,
           _partialUpsertHandler, _deleteRecordColumn, _dropOutOfOrderRecord, 
_outOfOrderRecordColumn, _enableSnapshot,
           _enablePreload, _metadataTTL, _deletedKeysTTL, 
_enableDeletedKeysCompactionConsistency, _consistencyMode,
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index dc04193737c..f7e89f6660c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -881,6 +881,18 @@ public final class TableConfigUtils {
         Preconditions.checkState(upsertConfig.getNewSegmentTrackingTimeMs() > 
0,
             "Positive newSegmentTrackingTimeMs is required to enable 
consistency mode: "
                 + upsertConfig.getConsistencyMode());
+        // dropOutOfOrderRecord and outOfOrderRecordColumn are not supported 
with SYNC/SNAPSHOT consistency mode
+        // because records must be indexed before metadata is updated. By the 
time we know if a record is
+        // out-of-order, it's already indexed - As of today, we can't skip 
indexing or update the
+        // out-of-order column value.
+        Preconditions.checkState(!upsertConfig.isDropOutOfOrderRecord(),
+            "dropOutOfOrderRecord cannot be enabled when consistencyMode is 
%s. "
+                + "Out-of-order records can only be dropped in NONE 
consistency mode.",
+            upsertConfig.getConsistencyMode());
+        Preconditions.checkState(upsertConfig.getOutOfOrderRecordColumn() == 
null,
+            "outOfOrderRecordColumn cannot be configured when consistencyMode 
is %s. "
+                + "Out-of-order record marking is only supported in NONE 
consistency mode.",
+            upsertConfig.getConsistencyMode());
       }
     }
 
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index ceb7df1d163..ba2495355d3 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -2501,6 +2501,121 @@ public class TableConfigUtilsTest {
       assertEquals(e.getMessage(), "The outOfOrderRecordColumn must be a 
single-valued BOOLEAN column");
     }
 
+    // test dropOutOfOrderRecord cannot be enabled with SYNC consistency mode
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
+        .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+        .addSingleValueDimension("myPkCol", FieldSpec.DataType.STRING)
+        .build();
+    streamConfigs = getStreamConfigs();
+    upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setDropOutOfOrderRecord(true);
+    upsertConfig.setConsistencyMode(UpsertConfig.ConsistencyMode.SYNC);
+    upsertConfig.setNewSegmentTrackingTimeMs(60000L);
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        .setStreamConfigs(streamConfigs)
+        .setUpsertConfig(upsertConfig)
+        .setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+        .build();
+    try {
+      TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+      fail("Expected IllegalStateException for dropOutOfOrderRecord with SYNC 
consistency mode");
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains("dropOutOfOrderRecord cannot be 
enabled when consistencyMode is SYNC"));
+    }
+
+    // test dropOutOfOrderRecord cannot be enabled with SNAPSHOT consistency 
mode
+    upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setDropOutOfOrderRecord(true);
+    upsertConfig.setConsistencyMode(UpsertConfig.ConsistencyMode.SNAPSHOT);
+    upsertConfig.setNewSegmentTrackingTimeMs(60000L);
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        .setStreamConfigs(streamConfigs)
+        .setUpsertConfig(upsertConfig)
+        .setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+        .build();
+    try {
+      TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+      fail("Expected IllegalStateException for dropOutOfOrderRecord with 
SNAPSHOT consistency mode");
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains("dropOutOfOrderRecord cannot be 
enabled when consistencyMode is SNAPSHOT"));
+    }
+
+    // test dropOutOfOrderRecord is allowed with NONE consistency mode (should 
not throw)
+    upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setDropOutOfOrderRecord(true);
+    upsertConfig.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE);
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        .setStreamConfigs(streamConfigs)
+        .setUpsertConfig(upsertConfig)
+        .setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+        .build();
+    TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+
+    // test outOfOrderRecordColumn cannot be enabled with SYNC consistency mode
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
+        .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+        .addSingleValueDimension("myPkCol", FieldSpec.DataType.STRING)
+        .addSingleValueDimension("isOutOfOrder", FieldSpec.DataType.BOOLEAN)
+        .build();
+    upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setOutOfOrderRecordColumn("isOutOfOrder");
+    upsertConfig.setConsistencyMode(UpsertConfig.ConsistencyMode.SYNC);
+    upsertConfig.setNewSegmentTrackingTimeMs(60000L);
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        .setStreamConfigs(streamConfigs)
+        .setUpsertConfig(upsertConfig)
+        .setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+        .build();
+    try {
+      TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+      fail("Expected IllegalStateException for outOfOrderRecordColumn with 
SYNC consistency mode");
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains("outOfOrderRecordColumn cannot be 
configured when consistencyMode is SYNC"));
+    }
+
+    // test outOfOrderRecordColumn cannot be enabled with SNAPSHOT consistency 
mode
+    upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setOutOfOrderRecordColumn("isOutOfOrder");
+    upsertConfig.setConsistencyMode(UpsertConfig.ConsistencyMode.SNAPSHOT);
+    upsertConfig.setNewSegmentTrackingTimeMs(60000L);
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        .setStreamConfigs(streamConfigs)
+        .setUpsertConfig(upsertConfig)
+        .setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+        .build();
+    try {
+      TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+      fail("Expected IllegalStateException for outOfOrderRecordColumn with 
SNAPSHOT consistency mode");
+    } catch (IllegalStateException e) {
+      assertTrue(
+          e.getMessage().contains("outOfOrderRecordColumn cannot be configured 
when consistencyMode is SNAPSHOT"));
+    }
+
+    // test outOfOrderRecordColumn is allowed with NONE consistency mode 
(should not throw)
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
+        .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+        .addSingleValueDimension("myPkCol", FieldSpec.DataType.STRING)
+        .addSingleValueDimension("isOutOfOrder", FieldSpec.DataType.BOOLEAN)
+        .build();
+    upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setOutOfOrderRecordColumn("isOutOfOrder");
+    upsertConfig.setConsistencyMode(UpsertConfig.ConsistencyMode.NONE);
+    tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        .setStreamConfigs(streamConfigs)
+        .setUpsertConfig(upsertConfig)
+        .setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+        .build();
+    TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+
     // test enableDeletedKeysCompactionConsistency shouldn't exist with 
metadataTTL
     upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
     upsertConfig.setEnableDeletedKeysCompactionConsistency(true);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to