pvary commented on code in PR #14435:
URL: https://github.com/apache/iceberg/pull/14435#discussion_r2754064288


##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java:
##########
@@ -2657,4 +2682,322 @@ public boolean matches(RewriteFileGroup argument) {
       return groupIDs.contains(argument.info().globalIndex());
     }
   }
+
+  @TestTemplate
+  public void testBinPackUsesCorrectRunnerBasedOnOption() {
+    assumeThat(useParquetFileMerger).isTrue();
+
+    Table table = createTable(4);
+    shouldHaveFiles(table, 4);
+
+    // Test that binPack() respects the configuration option
+    // When enabled, should use SparkParquetFileMergeRunner
+    RewriteDataFilesSparkAction actionWithMerger =
+        
basicRewrite(table).option(RewriteDataFiles.USE_PARQUET_ROW_GROUP_MERGE, 
"true");
+    actionWithMerger.binPack();
+    
assertThat(actionWithMerger.runnerDescription()).isEqualTo("PARQUET-MERGE");
+
+    RewriteDataFiles.Result resultWithMerger = actionWithMerger.execute();
+
+    assertThat(resultWithMerger.rewrittenDataFilesCount()).isEqualTo(4);
+    assertThat(resultWithMerger.addedDataFilesCount()).isGreaterThan(0);
+
+    // Write more data to the table so we can test again
+    writeRecords(100, SCALE);
+
+    // When disabled, should use SparkBinPackFileRewriteRunner
+    RewriteDataFilesSparkAction actionWithoutMerger =
+        
basicRewrite(table).option(RewriteDataFiles.USE_PARQUET_ROW_GROUP_MERGE, 
"false");
+    actionWithoutMerger.binPack();
+    assertThat(actionWithoutMerger.runnerDescription()).isEqualTo("BIN-PACK");
+
+    RewriteDataFiles.Result resultWithoutMerger = 
actionWithoutMerger.execute();
+
+    // Should rewrite the newly added files
+    assertThat(resultWithoutMerger.rewrittenDataFilesCount()).isGreaterThan(0);
+  }
+
+  /**
+   * Test that both binpack and ParquetFileMerger convert virtual row IDs to 
physical and produce
+   * equivalent results for row lineage preservation.
+   */
+  @TestTemplate
+  public void 
testParquetFileMergerProduceConsistentRowLineageWithBinPackMerger()
+      throws IOException {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+    assumeThat(useParquetFileMerger).isTrue();
+
+    // Test binpack approach
+    Table binpackTable = createTable(4);
+    shouldHaveFiles(binpackTable, 4);
+    verifyInitialVirtualRowIds(binpackTable);
+    long binpackCountBefore = currentData().size();
+
+    RewriteDataFiles.Result binpackResult =
+        basicRewrite(binpackTable)
+            .option(RewriteDataFiles.USE_PARQUET_ROW_GROUP_MERGE, "false")
+            .binPack()
+            .execute();
+
+    assertThat(binpackResult.rewrittenDataFilesCount()).isEqualTo(4);
+    assertThat(binpackResult.addedDataFilesCount()).isGreaterThan(0);
+    assertThat(currentData()).hasSize((int) binpackCountBefore);
+    verifyPhysicalRowIdsAfterMerge(binpackTable, "Binpack");
+
+    // Test ParquetFileMerger approach with a different table location
+    String originalTableLocation = tableLocation;
+    tableLocation = new File(tableDir, "merger-table").toURI().toString();
+    Table mergerTable = createTable(4);
+    shouldHaveFiles(mergerTable, 4);
+    verifyInitialVirtualRowIds(mergerTable);
+    long mergerCountBefore = currentData().size();
+
+    RewriteDataFiles.Result mergerResult =
+        basicRewrite(mergerTable)
+            .option(RewriteDataFiles.USE_PARQUET_ROW_GROUP_MERGE, "true")
+            .binPack()
+            .execute();
+
+    assertThat(mergerResult.rewrittenDataFilesCount()).isEqualTo(4);
+    assertThat(mergerResult.addedDataFilesCount()).isGreaterThan(0);
+    assertThat(currentData()).hasSize((int) mergerCountBefore);
+    verifyPhysicalRowIdsAfterMerge(mergerTable, "ParquetFileMerger");
+
+    // Restore original table location
+    tableLocation = originalTableLocation;
+
+    // Verify both approaches produce equivalent results
+    assertThat(binpackCountBefore)
+        .as("Both tables should have same initial record count")
+        .isEqualTo(mergerCountBefore);
+    assertThat(binpackResult.addedDataFilesCount())
+        .as("Both approaches should produce same number of output files")
+        .isEqualTo(mergerResult.addedDataFilesCount());
+  }
+
+  private void verifyInitialVirtualRowIds(Table table) throws IOException {
+    List<DataFile> dataFiles = TestHelpers.dataFiles(table);
+    assertThat(dataFiles).isNotEmpty();
+    for (DataFile dataFile : dataFiles) {
+      assertThat(dataFile.firstRowId())
+          .as("Files should have virtual row IDs (first_row_id != null)")
+          .isNotNull();
+
+      // Verify files don't have physical _row_id column
+      ParquetFileReader reader =
+          ParquetFileReader.open(
+              HadoopInputFile.fromPath(
+                  new Path(dataFile.path().toString()), 
spark.sessionState().newHadoopConf()));
+      MessageType schema = reader.getFooter().getFileMetaData().getSchema();
+      assertThat(schema.containsField("_row_id"))
+          .as("Files should not have physical _row_id column before merge")
+          .isFalse();
+      reader.close();
+    }
+  }
+
+  private void verifyPhysicalRowIdsAfterMerge(Table table, String approach) 
throws IOException {
+    List<DataFile> dataFiles = TestHelpers.dataFiles(table);
+    assertThat(dataFiles).isNotEmpty();
+    for (DataFile dataFile : dataFiles) {
+      assertThat(dataFile.firstRowId())
+          .as(approach + " should extract firstRowId from min(_row_id) column 
statistics")
+          .isNotNull();
+
+      // Verify files have physical _row_id column
+      ParquetFileReader reader =
+          ParquetFileReader.open(
+              HadoopInputFile.fromPath(
+                  new Path(dataFile.path().toString()), 
spark.sessionState().newHadoopConf()));
+      MessageType schema = reader.getFooter().getFileMetaData().getSchema();
+      assertThat(schema.containsField("_row_id"))
+          .as(approach + " should write physical _row_id column")
+          .isTrue();
+      reader.close();
+    }
+  }
+
+  /**
+   * Test that ParquetFileMerger can generate physical _row_id columns when 
merging files with
+   * virtual row IDs (metadata only).
+   */
+  @TestTemplate
+  public void testParquetFileMergerGeneratesPhysicalRowIds() throws 
IOException {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+    Table table = createTable(4);
+    shouldHaveFiles(table, 4);
+
+    // Merge files with virtual row IDs - should generate physical _row_id 
columns
+    RewriteDataFiles.Result result =
+        basicRewrite(table)
+            .option(
+                RewriteDataFiles.USE_PARQUET_ROW_GROUP_MERGE, 
String.valueOf(useParquetFileMerger))
+            .binPack()
+            .execute();
+
+    assertThat(result.rewrittenDataFilesCount()).isEqualTo(4);
+    assertThat(result.addedDataFilesCount()).isGreaterThan(0);
+
+    // Verify files have physical _row_id column and first_row_id set
+    // Note: For V3+ tables, firstRowId is required in metadata even with 
physical _row_id column
+    List<DataFile> dataFiles = TestHelpers.dataFiles(table);
+    assertThat(dataFiles).isNotEmpty();
+    for (DataFile dataFile : dataFiles) {
+      assertThat(dataFile.firstRowId())
+          .as("Files should have first_row_id set after merge (required for 
V3+)")
+          .isNotNull();
+
+      ParquetFileReader reader =
+          ParquetFileReader.open(
+              HadoopInputFile.fromPath(
+                  new Path(dataFile.path().toString()), 
spark.sessionState().newHadoopConf()));
+      MessageType schema = reader.getFooter().getFileMetaData().getSchema();
+      assertThat(schema.containsField("_row_id"))
+          .as("Files should have physical _row_id column after merge")
+          .isTrue();
+      reader.close();
+    }
+  }
+
+  /**
+   * Test that ParquetFileMerger preserves existing physical _row_id columns 
via binary copy when
+   * merging files that already have physical row IDs.
+   */
+  @TestTemplate
+  public void testParquetFileMergerPreservesPhysicalRowIds() throws 
IOException {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+    // Create a V3+ table and do an initial merge to create files with 
physical row IDs
+    Table table = createTable(4);
+    shouldHaveFiles(table, 4);
+
+    // First merge: converts virtual row IDs to physical
+    RewriteDataFiles.Result firstMerge =
+        basicRewrite(table)
+            .option(
+                RewriteDataFiles.USE_PARQUET_ROW_GROUP_MERGE, 
String.valueOf(useParquetFileMerger))
+            .binPack()
+            .execute();
+
+    assertThat(firstMerge.rewrittenDataFilesCount()).isEqualTo(4);
+    assertThat(firstMerge.addedDataFilesCount()).isGreaterThan(0);
+
+    // Add more data to create additional files (without physical row IDs)
+    writeRecords(2, SCALE);
+    List<DataFile> filesBeforeSecondMerge = TestHelpers.dataFiles(table);
+    int expectedFileCount = filesBeforeSecondMerge.size();
+
+    long countBefore = currentData().size();
+
+    // Second merge: should preserve physical row IDs via binary copy for 
files that have them
+    RewriteDataFiles.Result secondMerge =
+        basicRewrite(table)
+            .option(
+                RewriteDataFiles.USE_PARQUET_ROW_GROUP_MERGE, 
String.valueOf(useParquetFileMerger))
+            .binPack()
+            .execute();
+
+    assertThat(secondMerge.rewrittenDataFilesCount()).isGreaterThan(0);
+    assertThat(secondMerge.addedDataFilesCount()).isGreaterThan(0);
+    assertThat(currentData()).hasSize((int) countBefore);
+
+    // Verify merged files still have physical _row_id column and firstRowId 
from statistics
+    // Same as binpack approach: firstRowId is extracted from min(_row_id) 
column statistics
+    List<DataFile> dataFilesAfterSecondMerge = TestHelpers.dataFiles(table);
+    assertThat(dataFilesAfterSecondMerge).isNotEmpty();
+    for (DataFile dataFile : dataFilesAfterSecondMerge) {
+      assertThat(dataFile.firstRowId())
+          .as(
+              "Merged files should have firstRowId extracted from _row_id 
column statistics (same as binpack)")
+          .isNotNull();
+
+      // Verify files still have physical _row_id column
+      ParquetFileReader reader =
+          ParquetFileReader.open(
+              HadoopInputFile.fromPath(
+                  new Path(dataFile.path().toString()), 
spark.sessionState().newHadoopConf()));
+      MessageType schema = reader.getFooter().getFileMetaData().getSchema();
+      assertThat(schema.containsField("_row_id"))
+          .as("Merged files should preserve physical _row_id column")
+          .isTrue();
+      reader.close();
+    }
+  }
+
+  /** Test that row lineage preservation works correctly with partitioned 
tables. */

Review Comment:
   Please update the comment, as the test now runs on both merger



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to