This is an automated email from the ASF dual-hosted git repository.
Jackie-Jiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 91c1c937100 Fix testCommitTimeCompactionPreservesDeletedRecords
ignoring delete record column (#18473)
91c1c937100 is described below
commit 91c1c93710010ebd5870f61ca5ad3562a0c23dd6
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue May 12 09:10:54 2026 -0700
Fix testCommitTimeCompactionPreservesDeletedRecords ignoring delete record
column (#18473)
---
.../tests/CommitTimeCompactionIntegrationTest.java | 30 +++++++++++-----------
1 file changed, 15 insertions(+), 15 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CommitTimeCompactionIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CommitTimeCompactionIntegrationTest.java
index 30d8561c420..fb41b73f56d 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CommitTimeCompactionIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CommitTimeCompactionIntegrationTest.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.client.ExecutionStats;
import org.apache.pinot.client.ResultSet;
@@ -957,6 +958,13 @@ public class CommitTimeCompactionIntegrationTest extends
BaseClusterIntegrationT
private TableConfig createUpsertTable(String tableName, String
kafkaTopicName, UpsertConfig.Mode upsertMode,
boolean enableCommitTimeCompaction, boolean
enableColumnMajorSegmentBuilder)
throws Exception {
+ return createUpsertTable(tableName, kafkaTopicName, upsertMode,
enableCommitTimeCompaction,
+ enableColumnMajorSegmentBuilder, null);
+ }
+
+ private TableConfig createUpsertTable(String tableName, String
kafkaTopicName, UpsertConfig.Mode upsertMode,
+ boolean enableCommitTimeCompaction, boolean
enableColumnMajorSegmentBuilder, @Nullable String deleteRecordColumn)
+ throws Exception {
// Create schema
Schema schema = createSchema();
schema.setSchemaName(tableName);
@@ -965,6 +973,9 @@ public class CommitTimeCompactionIntegrationTest extends
BaseClusterIntegrationT
// Create upsert config
UpsertConfig upsertConfig = new UpsertConfig(upsertMode);
upsertConfig.setEnableCommitTimeCompaction(enableCommitTimeCompaction);
+ if (deleteRecordColumn != null) {
+ upsertConfig.setDeleteRecordColumn(deleteRecordColumn);
+ }
// Create table config
Map<String, String> csvDecoderProperties =
getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER);
@@ -1264,27 +1275,16 @@ public class CommitTimeCompactionIntegrationTest
extends BaseClusterIntegrationT
// TABLE 1: With commit-time compaction DISABLED (baseline) and delete
record column configured
String tableNameWithoutCompaction =
"gameScoresDeletedRecordsCompactionDisabled";
- TableConfig tableConfigWithoutCompaction =
createUpsertTable(tableNameWithoutCompaction, kafkaTopicName,
- UpsertConfig.Mode.FULL, false, false);
- // Configure delete record column for soft deletes
-
tableConfigWithoutCompaction.getUpsertConfig().setDeleteRecordColumn("deleted");
- updateTableConfig(tableConfigWithoutCompaction);
+ createUpsertTable(tableNameWithoutCompaction, kafkaTopicName,
UpsertConfig.Mode.FULL, false, false, "deleted");
// TABLE 2: With commit-time compaction ENABLED + row-major build and
delete record column configured
String tableNameWithCompaction =
"gameScoresDeletedRecordsCompactionEnabled";
- TableConfig tableConfigWithCompaction =
createUpsertTable(tableNameWithCompaction, kafkaTopicName,
- UpsertConfig.Mode.FULL, true, false);
- // Configure delete record column for soft deletes
-
tableConfigWithCompaction.getUpsertConfig().setDeleteRecordColumn("deleted");
- updateTableConfig(tableConfigWithCompaction);
+ createUpsertTable(tableNameWithCompaction, kafkaTopicName,
UpsertConfig.Mode.FULL, true, false, "deleted");
// TABLE 3: With commit-time compaction ENABLED + column-major build and
delete record column configured
String tableNameWithCompactionColumnMajor =
"gameScoresDeletedRecordsCompactionColumnMajor";
- TableConfig tableConfigWithCompactionColumnMajor =
createUpsertTable(tableNameWithCompactionColumnMajor,
- kafkaTopicName, UpsertConfig.Mode.FULL, true, true);
- // Configure delete record column for soft deletes
-
tableConfigWithCompactionColumnMajor.getUpsertConfig().setDeleteRecordColumn("deleted");
- updateTableConfig(tableConfigWithCompactionColumnMajor);
+ createUpsertTable(tableNameWithCompactionColumnMajor, kafkaTopicName,
UpsertConfig.Mode.FULL, true, true,
+ "deleted");
// Wait for all three tables to load the same initial data (3 unique
records after upserts)
waitForAllDocsLoaded(tableNameWithoutCompaction, tableNameWithCompaction,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]