klsince commented on code in PR #15528:
URL: https://github.com/apache/pinot/pull/15528#discussion_r2045522224


##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java:
##########
@@ -44,12 +46,16 @@ public enum ConsistencyMode {
   }
 
   @JsonPropertyDescription("Upsert mode.")
-  private Mode _mode;
+  private Mode _mode = Mode.FULL;
 
   @JsonPropertyDescription("Function to hash the primary key.")
   private HashFunction _hashFunction = HashFunction.NONE;
 
+  /// Maintains the mapping of merge strategies per column.

Review Comment:
   actually, looks like all comments started with 3 `/` in this file



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java:
##########
@@ -40,64 +39,91 @@ public abstract class BaseTableDedupMetadataManager 
implements TableDedupMetadat
 
   protected final Map<Integer, PartitionDedupMetadataManager> 
_partitionMetadataManagerMap = new ConcurrentHashMap<>();
   protected String _tableNameWithType;
-  protected DedupContext _dedupContext;
-  private boolean _enablePreload;
+  protected DedupContext _context;
 
   @Override
-  public void init(TableConfig tableConfig, Schema schema, TableDataManager 
tableDataManager,
-      ServerMetrics serverMetrics) {
+  public void init(PinotConfiguration instanceDedupConfig, TableConfig 
tableConfig, Schema schema,
+      TableDataManager tableDataManager) {
     _tableNameWithType = tableConfig.getTableName();
 
+    Preconditions.checkArgument(tableConfig.isDedupEnabled(), "Dedup must be 
enabled for table: %s",
+        _tableNameWithType);
+    DedupConfig dedupConfig = tableConfig.getDedupConfig();
+    assert dedupConfig != null;
+
     List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
     Preconditions.checkArgument(!CollectionUtils.isEmpty(primaryKeyColumns),
         "Primary key columns must be configured for dedup enabled table: %s", 
_tableNameWithType);
 
-    DedupConfig dedupConfig = tableConfig.getDedupConfig();
-    Preconditions.checkArgument(dedupConfig != null, "Dedup must be enabled 
for table: %s", _tableNameWithType);
     double metadataTTL = dedupConfig.getMetadataTTL();
     String dedupTimeColumn = dedupConfig.getDedupTimeColumn();
     if (dedupTimeColumn == null) {
       dedupTimeColumn = tableConfig.getValidationConfig().getTimeColumnName();
     }
     if (metadataTTL > 0) {
       Preconditions.checkArgument(dedupTimeColumn != null,
-          "When metadataTTL is configured, metadata time column or time column 
must be configured for "
-              + "dedup enabled table: %s", _tableNameWithType);
+          "When metadataTTL is configured, metadata time column or time column 
must be configured for dedup enabled "
+              + "table: %s", _tableNameWithType);
+    }
+
+    boolean enablePreload =
+        dedupConfig.getPreload().isEnabled(() -> 
instanceDedupConfig.getProperty(Dedup.DEFAULT_ENABLE_PRELOAD, false));
+    if (enablePreload) {
+      if (tableDataManager.getSegmentPreloadExecutor() == null) {
+        LOGGER.warn("Preload cannot be enabled without segment preload 
executor for table: {}", _tableNameWithType);
+        enablePreload = false;
+      }
     }
-    _enablePreload = dedupConfig.isEnablePreload() && 
tableDataManager.getSegmentPreloadExecutor() != null;
-    HashFunction hashFunction = dedupConfig.getHashFunction();
-    File tableIndexDir = tableDataManager.getTableDataDir();
-    DedupContext.Builder dedupContextBuider = new DedupContext.Builder();
-    
dedupContextBuider.setTableConfig(tableConfig).setSchema(schema).setPrimaryKeyColumns(primaryKeyColumns)
-        
.setHashFunction(hashFunction).setEnablePreload(_enablePreload).setMetadataTTL(metadataTTL)
-        
.setDedupTimeColumn(dedupTimeColumn).setTableIndexDir(tableIndexDir).setTableDataManager(tableDataManager);
-    _dedupContext = dedupContextBuider.build();
-    LOGGER.info(
-        "Initialized {} for table: {} with primary key columns: {}, hash 
function: {}, enable preload: {}, metadata "
-            + "TTL: {}, dedup time column: {}, table index dir: {}", 
getClass().getSimpleName(), _tableNameWithType,
-        primaryKeyColumns, hashFunction, _enablePreload, metadataTTL, 
dedupTimeColumn, tableIndexDir);
+
+    // NOTE: This field doesn't follow enablement override, and always take 
instance config if set to true.

Review Comment:
   typo in comment: ... if set to `false`?



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java:
##########
@@ -110,7 +109,8 @@ public void testValidateIfMergeRollupCanBeEnabledOrNot() {
 
     tableConfig =
         new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
-            .setDedupConfig(new DedupConfig(true, HashFunction.MD5)).build();
+            .setDedupConfig(new DedupConfig())
+            .build();

Review Comment:
   nit: format?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java:
##########
@@ -19,15 +19,17 @@
 package org.apache.pinot.spi.config.table;
 
 import com.fasterxml.jackson.annotation.JsonPropertyDescription;
-import java.util.Collections;
+import com.google.common.base.Preconditions;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.config.BaseJsonConfig;
+import org.apache.pinot.spi.utils.Enablement;
 
 
-/** Class representing upsert configuration of a table. */
+/// Class representing upsert configuration of a table.

Review Comment:
   nit: rm one `/`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -744,17 +744,18 @@ static void validateUpsertAndDedupConfig(TableConfig 
tableConfig, Schema schema)
         Preconditions.checkState(upsertConfig.getMetadataTTL() == 0,
             "enableDeletedKeysCompactionConsistency and metadataTTL shouldn't 
exist together for upsert table");
 
-        // enableDeletedKeysCompactionConsistency shouldn't exist with 
enablePreload
-        Preconditions.checkState(!upsertConfig.isEnablePreload(),
-            "enableDeletedKeysCompactionConsistency and enablePreload 
shouldn't exist together for upsert table");
+        // enableDeletedKeysCompactionConsistency shouldn't exist with preload 
enabled
+        Preconditions.checkState(upsertConfig.getPreload() != 
Enablement.ENABLE,

Review Comment:
   should we check `== Enablement.DISABLE` here? as iiuc, if it's non-DISABLE, 
then it'd use server config



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1780,19 +1778,18 @@ ParallelSegmentConsumptionPolicy 
getParallelConsumptionPolicy() {
     //   - For pauseless tables, allow consumption during build, but disallow 
consumption during download
     //   - For non-pauseless tables, disallow consumption during build and 
download
     //     TODO: Revisit the non-pauseless handling
-    if (_realtimeTableDataManager.isDedupEnabled()) {
-      DedupConfig dedupConfig = _tableConfig.getDedupConfig();
-      assert dedupConfig != null;
-      if (dedupConfig.isAllowDedupConsumptionDuringCommit()) {
+    if (_partitionUpsertMetadataManager != null) {
+      UpsertContext upsertContext = 
_partitionUpsertMetadataManager.getContext();
+      if (upsertContext.isAllowPartialUpsertConsumptionDuringCommit()
+          || upsertContext.getUpsertMode() != UpsertConfig.Mode.PARTIAL) {

Review Comment:
   hmm.. good catch, but iirc, it's checking if the upsert mode is partial 
upsert previously... but anyway, this check looks right now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to