singhpk234 commented on code in PR #7520: URL: https://github.com/apache/iceberg/pull/7520#discussion_r1185192213
########## spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java: ########## @@ -103,6 +108,70 @@ public void removeTables() { sql("DROP TABLE IF EXISTS parquet_table"); } + @Test + public void testSkewDelete() throws Exception { + createAndInitPartitionedTable(); + + Employee[] employees = new Employee[100]; + for (int index = 0; index < 100; index++) { + employees[index] = new Employee(index, "hr"); + } + append(tableName, employees); + append(tableName, employees); + append(tableName, employees); + append(tableName, employees); + + // set the open file cost large enough to produce a separate scan task per file + // use hash distribution to trigger a shuffle + Map<String, String> tableProps = + ImmutableMap.of( + SPLIT_OPEN_FILE_COST, + String.valueOf(Integer.MAX_VALUE), + DELETE_DISTRIBUTION_MODE, + DistributionMode.HASH.modeName()); + sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps)); + + createBranchIfNeeded(); + + // enable AQE and set the advisory partition size small enough to trigger a split + // set the number of shuffle partitions to 2 to only have 2 reducers + withSQLConf( + ImmutableMap.of( + SQLConf.SHUFFLE_PARTITIONS().key(), "2", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"), + () -> { + SparkPlan plan = + executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget()); + Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL"); + }); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); + Map<String, String> summary = currentSnapshot.summary(); + + if (mode(table) == COPY_ON_WRITE) { + // CoW DELETE requests the remaining records to be clustered by `_file` + // each task contains only 1 file and therefore writes only 1 shuffle block + // that means 4 shuffle blocks are distributed among 2 reducers + // AQE detects that all shuffle blocks are big and processes them in 4 independent tasks + // otherwise, there would be 2 tasks processing 2 shuffle blocks each + int addedFiles = Integer.parseInt(summary.get(SnapshotSummary.ADDED_FILES_PROP)); + Assert.assertEquals("Must produce 4 files", 4, addedFiles); + } else { + // MoR DELETE requests the deleted records to be clustered by `_spec_id` and `_partition` + // all tasks belong to the same partition and therefore write only 1 shuffle block per task + // that means there are 4 shuffle blocks, all assigned to the same reducer + // AQE detects that all 4 shuffle blocks are big and processes them in 4 separate tasks + // otherwise, there would be 1 task processing 4 shuffle blocks + int addedFiles = Integer.parseInt(summary.get(SnapshotSummary.ADDED_DELETE_FILES_PROP)); Review Comment: [minor] can use PropertyUtil here ########## spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java: ########## @@ -103,6 +108,70 @@ public void removeTables() { sql("DROP TABLE IF EXISTS parquet_table"); } + @Test + public void testSkewDelete() throws Exception { + createAndInitPartitionedTable(); + + Employee[] employees = new Employee[100]; + for (int index = 0; index < 100; index++) { + employees[index] = new Employee(index, "hr"); + } + append(tableName, employees); + append(tableName, employees); + append(tableName, employees); + append(tableName, employees); + + // set the open file cost large enough to produce a separate scan task per file + // use hash distribution to trigger a shuffle + Map<String, String> tableProps = + ImmutableMap.of( + SPLIT_OPEN_FILE_COST, + String.valueOf(Integer.MAX_VALUE), + DELETE_DISTRIBUTION_MODE, + DistributionMode.HASH.modeName()); + sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps)); + + createBranchIfNeeded(); + + // enable AQE and set the advisory partition size small enough to trigger a split + // set the number of shuffle partitions to 2 to only have 2 reducers + withSQLConf( + ImmutableMap.of( + SQLConf.SHUFFLE_PARTITIONS().key(), "2", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"), + () -> { + SparkPlan plan = + executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget()); + Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL"); + }); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); + Map<String, String> summary = currentSnapshot.summary(); + + if (mode(table) == COPY_ON_WRITE) { + // CoW DELETE requests the remaining records to be clustered by `_file` + // each task contains only 1 file and therefore writes only 1 shuffle block + // that means 4 shuffle blocks are distributed among 2 reducers + // AQE detects that all shuffle blocks are big and processes them in 4 independent tasks + // otherwise, there would be 2 tasks processing 2 shuffle blocks each Review Comment: [doubt] should we also add a UT where coalese is happening ? ########## spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java: ########## @@ -103,6 +108,70 @@ public void removeTables() { sql("DROP TABLE IF EXISTS parquet_table"); } + @Test + public void testSkewDelete() throws Exception { + createAndInitPartitionedTable(); + + Employee[] employees = new Employee[100]; + for (int index = 0; index < 100; index++) { + employees[index] = new Employee(index, "hr"); + } + append(tableName, employees); + append(tableName, employees); + append(tableName, employees); + append(tableName, employees); + + // set the open file cost large enough to produce a separate scan task per file + // use hash distribution to trigger a shuffle + Map<String, String> tableProps = + ImmutableMap.of( + SPLIT_OPEN_FILE_COST, + String.valueOf(Integer.MAX_VALUE), + DELETE_DISTRIBUTION_MODE, + DistributionMode.HASH.modeName()); + sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps)); + + createBranchIfNeeded(); + + // enable AQE and set the advisory partition size small enough to trigger a split + // set the number of shuffle partitions to 2 to only have 2 reducers + withSQLConf( + ImmutableMap.of( + SQLConf.SHUFFLE_PARTITIONS().key(), "2", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"), + () -> { + SparkPlan plan = + executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget()); + Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL"); + }); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); + Map<String, String> summary = currentSnapshot.summary(); + + if (mode(table) == COPY_ON_WRITE) { + // CoW DELETE requests the remaining records to be clustered by `_file` + // each task contains only 1 file and therefore writes only 1 shuffle block + // that means 4 shuffle blocks are distributed among 2 reducers + // AQE detects that all shuffle blocks are big and processes them in 4 independent tasks + // otherwise, there would be 2 tasks processing 2 shuffle blocks each + int addedFiles = Integer.parseInt(summary.get(SnapshotSummary.ADDED_FILES_PROP)); + Assert.assertEquals("Must produce 4 files", 4, addedFiles); Review Comment: [minor] can this be moved to a private func for ex: assertAddedFiles to use in both MOR / COW ? ########## spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java: ########## @@ -135,6 +135,11 @@ public Distribution requiredDistribution() { return requiredDistribution; } + @Override + public boolean distributionStrictlyRequired() { + return false; Review Comment: should we also check `ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED` is true as well before disabling this requirement ? otherwise it will be a no-op for `OptimizeSkewInRebalancePartitions` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org