[ 
https://issues.apache.org/jira/browse/GOBBLIN-2159?focusedWorklogId=939634&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-939634
 ]

ASF GitHub Bot logged work on GOBBLIN-2159:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Oct/24 07:47
            Start Date: 23/Oct/24 07:47
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4058:
URL: https://github.com/apache/gobblin/pull/4058#discussion_r1811978448


##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java:
##########
@@ -190,8 +165,8 @@ public void testMultipleCopyEntitiesGenerated() throws 
IOException {
     srcFilePaths.add(SRC_WRITE_LOCATION + "/file4.orc");
     srcFilePaths.add(SRC_WRITE_LOCATION + "/file5.orc");
 
-    List<DataFile> srcDataFiles = getDataFiles();
-    
Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(srcDataFiles);
+    List<DataFile> mockSrcDataFiles = createDataFileMocks();
+    
Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(mockSrcDataFiles);

Review Comment:
   this mocking setup repeats several times.  could it live in a 
`@BeforeMethod`?



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java:
##########
@@ -226,6 +226,90 @@ public void testNewTablePropertiesAreRegistered() throws 
Exception {
     catalog.dropTable(destTableId);
   }
 
+  /** Verify that getPartitionSpecificDataFiles return datafiles belonging to 
the partition defined by predicate */
+  @Test
+  public void testGetPartitionSpecificDataFiles() throws IOException {
+    List<String> paths = Arrays.asList(
+        "/path/tableName/data/id=1/file1.orc",
+        "/path/tableName/data/file3.orc",
+        "/path/tableName/data/id=2/file5.orc",
+        "/path/tableName/data/file4.orc",
+        "/path/tableName/data/id=3/file2.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData.set(0, "1");
+    Map<String, PartitionData> pathsWithPartitionData = Maps.newHashMap();
+    paths.forEach(path -> pathsWithPartitionData.put(path, partitionData));
+
+    addPartitionDataFiles(table, createDataFiles(pathsWithPartitionData));
+
+    IcebergTable icebergTable = new IcebergTable(tableId,
+        catalog.newTableOps(tableId),
+        catalogUri,
+        catalog.loadTable(tableId));
+    // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate 
class
+    Predicate<StructLike> alwaysTruePredicate = partition -> true;
+    Predicate<StructLike> alwaysFalsePredicate = partition -> false;
+    
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(),
 5);
+    
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(),
 0);
+  }
+
+  /** Verify that overwritePartition replace data files belonging to given 
partition col and value */
+  @Test
+  public void testOverwritePartition() throws IOException {
+    List<String> paths = Arrays.asList(
+        "/path/tableName/data/id=1/file1.orc",
+        "/path/tableName/data/file2.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData = new 
PartitionData(icebergPartitionSpec.partitionType());

Review Comment:
   nit: `partition1Data` (esp. since you use it again below w/ `paths3`)



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java:
##########
@@ -183,16 +187,20 @@ private Path addUUIDToPath(String filePathStr) {
     return new Path(fileDir, newFileName);
   }
 
-  private Map<Path, FileStatus> 
getDestFilePathWithSrcFileStatus(List<DataFile> srcDataFiles,
-      List<DataFile> destDataFiles, FileSystem fs) throws IOException {
-    Map<Path, FileStatus> results = Maps.newHashMap();
-    for (int i = 0; i < srcDataFiles.size(); i++) {
-      Path srcPath = new Path(srcDataFiles.get(i).path().toString());
-      Path destPath = new Path(destDataFiles.get(i).path().toString());
-      FileStatus srcFileStatus = fs.getFileStatus(srcPath);
-      results.put(destPath, srcFileStatus);
-    }
-    return results;
+  private Map<Path, FileStatus> calcSrcFileStatusByDestFilePath(Map<Path, 
DataFile> destDataFileBySrcPath) {
+    Map<Path, FileStatus> srcFileStatusByDestFilePath = Maps.newHashMap();
+    destDataFileBySrcPath.forEach((srcPath, destDataFile) -> {
+      FileStatus srcFileStatus;
+      try {
+        srcFileStatus = this.sourceFs.getFileStatus(srcPath);
+      } catch (IOException e) {
+        String errMsg = String.format("~%s~ Failed to get file status for path 
: %s", this.getFileSetId(), srcPath);
+        log.error(errMsg);
+        throw new RuntimeException(errMsg, e);
+      }

Review Comment:
   I **really** wish `java.util.function.*` played along better w/ checked 
exceptions... but that's clearly not the case... \*sigh\*
   
   throwing `IOException` is actually a key part of [the `FileSet` 
"contract"](https://github.com/apache/gobblin/blob/5b495aff5d5bbe840add9f2db76ca944dffc69aa/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/partition/FileSet.java#L142),
 so substituting an unchecked `RuntimeException` (that no caller expects and 
would NOT be looking out for) is not something we ought to do at this late 
stage.
   
   instead, either write this iteratively (using `for`-each loop) or follow 
[`IcebergDataset`'s 
use](https://github.com/apache/gobblin/blob/af4b822a1e2f79721cdff617f1581e77ff261580/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java#L205)
 of 
[`CheckedExceptionFunction.wrapToTunneled`](https://github.com/apache/gobblin/blob/585298fb5ebc074f69c1b9db87de6186c4855b26/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionFunction.java#L69)
   ```
   try {
     ...
   } catch (CheckedExceptionFunction.WrappedIOException wrapper) {
     wrapper.rethrowWrapped();
   }
   ```
   
   the code there actually uses:
   ```
   copyConfig.getCopyContext().getFileStatus(targetFs, new 
Path(pathStr)).isPresent()
   ```
   for caching, which shouldn't be necessary here, given 
`IcebergTable::getPartitionSpecificDataFiles` examines only a single snapshot.



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java:
##########
@@ -75,9 +73,10 @@ public class IcebergPartitionDatasetTest {
   private static final String DEST_WRITE_LOCATION = DEST_TEST_DB + "/" + 
DEST_TEST_TABLE + "/data";
   private static final String TEST_ICEBERG_PARTITION_COLUMN_NAME = 
"testPartition";
   private static final String TEST_ICEBERG_PARTITION_COLUMN_VALUE = 
"testValue";
+  private static final String OVERWRITE_COMMIT_STEP = 
"org.apache.gobblin.data.management.copy.iceberg.IcebergOverwritePartitionsStep";

Review Comment:
   nit: `IcebergOverwritePartitionsStep.class.getName()`



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java:
##########
@@ -226,6 +226,90 @@ public void testNewTablePropertiesAreRegistered() throws 
Exception {
     catalog.dropTable(destTableId);
   }
 
+  /** Verify that getPartitionSpecificDataFiles return datafiles belonging to 
the partition defined by predicate */
+  @Test
+  public void testGetPartitionSpecificDataFiles() throws IOException {
+    List<String> paths = Arrays.asList(
+        "/path/tableName/data/id=1/file1.orc",
+        "/path/tableName/data/file3.orc",
+        "/path/tableName/data/id=2/file5.orc",
+        "/path/tableName/data/file4.orc",
+        "/path/tableName/data/id=3/file2.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData.set(0, "1");
+    Map<String, PartitionData> pathsWithPartitionData = Maps.newHashMap();
+    paths.forEach(path -> pathsWithPartitionData.put(path, partitionData));
+
+    addPartitionDataFiles(table, createDataFiles(pathsWithPartitionData));
+
+    IcebergTable icebergTable = new IcebergTable(tableId,
+        catalog.newTableOps(tableId),
+        catalogUri,
+        catalog.loadTable(tableId));
+    // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate 
class
+    Predicate<StructLike> alwaysTruePredicate = partition -> true;
+    Predicate<StructLike> alwaysFalsePredicate = partition -> false;
+    
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(),
 5);
+    
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(),
 0);
+  }
+
+  /** Verify that overwritePartition replace data files belonging to given 
partition col and value */
+  @Test
+  public void testOverwritePartition() throws IOException {
+    List<String> paths = Arrays.asList(
+        "/path/tableName/data/id=1/file1.orc",
+        "/path/tableName/data/file2.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData.set(0, "1");
+    Map<String, PartitionData> pathsWithPartitionData = Maps.newHashMap();
+    paths.forEach(path -> pathsWithPartitionData.put(path, partitionData));
+
+    addPartitionDataFiles(table, createDataFiles(pathsWithPartitionData));
+
+    IcebergTable icebergTable = new IcebergTable(tableId,
+        catalog.newTableOps(tableId),
+        catalogUri,
+        catalog.loadTable(tableId));
+
+    verifyAnyOrder(paths, 
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths 
should match");
+
+    List<String> paths2 = Arrays.asList(
+        "/path/tableName/data/file3.orc",
+        "/path/tableName/data/id=2/file4.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData2 = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData2.set(0, "2");
+    Map<String, PartitionData> paths2WithPartitionData2 = Maps.newHashMap();
+    paths2.forEach(path -> paths2WithPartitionData2.put(path, partitionData2));
+
+    List<DataFile> partition2DataFiles = 
createDataFiles(paths2WithPartitionData2);
+    // here, since partition data with value 2 doesn't exist yet,
+    // we expect it to get added to the table, w/o changing or deleting any 
other partitions
+    icebergTable.overwritePartition(partition2DataFiles, "id", "2");
+    List<String> expectedPaths2 = new ArrayList<>(paths);
+    expectedPaths2.addAll(paths2);
+    verifyAnyOrder(expectedPaths2, 
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths 
should match");
+
+    List<String> paths3 = Arrays.asList(
+        "/path/tableName/data/id=2/file5.orc",

Review Comment:
   suggest a note that this naming convention is irrelevant to partition 
membership, as evidenced by `"/path/tableName/data/id=2/file4.orc"` also in 
`paths2`



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java:
##########
@@ -119,11 +117,13 @@ public void testGetCurrentSnapshotInfo() throws 
IOException {
   }
 
   /** Verify failure when attempting to get current snapshot info for 
non-existent table */
-  @Test(expectedExceptions = {IcebergTable.TableNotFoundException.class, 
NoSuchTableException.class})
+  @Test(expectedExceptions = {IcebergTable.TableNotFoundException.class})
   public void testGetCurrentSnapshotInfoOnBogusTable() throws IOException {
     TableIdentifier bogusTableId = TableIdentifier.of(dbName, tableName + 
"_BOGUS");
+    // Passing null for Table as catalog.loadTable(bogusTableId) will throw 
NoSuchTableException so

Review Comment:
   typo on the exception class name



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java:
##########
@@ -225,39 +199,57 @@ public void 
testWithDifferentSrcAndDestTableWriteLocation() throws IOException {
     List<CopyEntity> copyEntities =
         (List<CopyEntity>) 
icebergPartitionDataset.generateCopyEntities(targetFs, copyConfiguration);
 
-    Assert.assertEquals(copyEntities.size(), 2);
-    verifyCopyEntities(copyEntities, false);
+    verifyCopyEntities(copyEntities, 2, false);
   }
 
-  private List<DataFile> getDataFiles() throws IOException {
+  private static void setupSrcFileSystem() throws IOException {
+    sourceFs = Mockito.mock(FileSystem.class);
+    Mockito.when(sourceFs.getUri()).thenReturn(SRC_FS_URI);
+    Mockito.when(sourceFs.makeQualified(any(Path.class)))
+        .thenAnswer(invocation -> invocation.getArgument(0, 
Path.class).makeQualified(SRC_FS_URI, new Path("/")));
+    
Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenAnswer(invocation -> {
+      Path path = invocation.getArgument(0, Path.class);
+      Path qualifiedPath = sourceFs.makeQualified(path);
+      return 
IcebergDatasetTest.MockFileSystemBuilder.createEmptyFileStatus(qualifiedPath.toString());
+    });
+  }
+
+  private static void setupDestFileSystem() throws IOException {
+    targetFs = Mockito.mock(FileSystem.class);
+    Mockito.when(targetFs.getUri()).thenReturn(DEST_FS_URI);
+    Mockito.when(targetFs.makeQualified(any(Path.class)))
+        .thenAnswer(invocation -> invocation.getArgument(0, 
Path.class).makeQualified(DEST_FS_URI, new Path("/")));
+    // Since we are adding UUID to the file name for every file while creating 
destination path,
+    // so return file not found exception if trying to find file status on 
destination file system
+    Mockito.when(targetFs.getFileStatus(any(Path.class))).thenThrow(new 
FileNotFoundException());
+  }
+
+  private static List<DataFile> createDataFileMocks() throws IOException {
     List<DataFile> dataFiles = new ArrayList<>();
     for (String srcFilePath : srcFilePaths) {
       DataFile dataFile = Mockito.mock(DataFile.class);
       Path dataFilePath = new Path(srcFilePath);
       Path qualifiedPath = sourceFs.makeQualified(dataFilePath);
       Mockito.when(dataFile.path()).thenReturn(dataFilePath.toString());
-      
Mockito.when(sourceFs.getFileStatus(Mockito.eq(dataFilePath))).thenReturn(getFileStatus(qualifiedPath));
+      
Mockito.when(sourceFs.getFileStatus(Mockito.eq(dataFilePath))).thenReturn(
+          
IcebergDatasetTest.MockFileSystemBuilder.createEmptyFileStatus(qualifiedPath.toString()));
       dataFiles.add(dataFile);
     }
     return dataFiles;
   }
 
-  private static FileStatus getFileStatus(Path path) {
-    FileStatus fileStatus = new FileStatus();
-    fileStatus.setPath(path);
-    return fileStatus;
-  }
-
-  private static void verifyCopyEntities(Collection<CopyEntity> copyEntities, 
boolean sameSrcAndDestWriteLocation) {
+  private static void verifyCopyEntities(Collection<CopyEntity> copyEntities, 
int expectedCopyEntitiesSize,

Review Comment:
   taking in the expected size is a definite improvement, but let's go further 
to also take in the `expectedSrcFilePaths` - please [see my 
reply](https://github.com/apache/gobblin/pull/4058#discussion_r1811938323)



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java:
##########
@@ -93,6 +93,7 @@ public class IcebergDatasetTest {
   private static final String MANIFEST_PATH_0 = ROOT_PATH + 
"metadata/manifest.a";
   private static final String MANIFEST_DATA_PATH_0A = ROOT_PATH + "data/p0/a";
   private static final String MANIFEST_DATA_PATH_0B = ROOT_PATH + "data/p0/b";
+  private static final String REGISTER_COMMIT_STEP = 
"org.apache.gobblin.data.management.copy.iceberg.IcebergRegisterStep";

Review Comment:
   `IcebergRegisterStep.class.getName()` will engage the type checker, whereas 
a string literal would not



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java:
##########
@@ -226,6 +226,90 @@ public void testNewTablePropertiesAreRegistered() throws 
Exception {
     catalog.dropTable(destTableId);
   }
 
+  /** Verify that getPartitionSpecificDataFiles return datafiles belonging to 
the partition defined by predicate */
+  @Test
+  public void testGetPartitionSpecificDataFiles() throws IOException {
+    List<String> paths = Arrays.asList(
+        "/path/tableName/data/id=1/file1.orc",
+        "/path/tableName/data/file3.orc",
+        "/path/tableName/data/id=2/file5.orc",
+        "/path/tableName/data/file4.orc",
+        "/path/tableName/data/id=3/file2.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData.set(0, "1");
+    Map<String, PartitionData> pathsWithPartitionData = Maps.newHashMap();
+    paths.forEach(path -> pathsWithPartitionData.put(path, partitionData));
+
+    addPartitionDataFiles(table, createDataFiles(pathsWithPartitionData));
+
+    IcebergTable icebergTable = new IcebergTable(tableId,
+        catalog.newTableOps(tableId),
+        catalogUri,
+        catalog.loadTable(tableId));
+    // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate 
class
+    Predicate<StructLike> alwaysTruePredicate = partition -> true;
+    Predicate<StructLike> alwaysFalsePredicate = partition -> false;
+    
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(),
 5);
+    
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(),
 0);
+  }
+
+  /** Verify that overwritePartition replace data files belonging to given 
partition col and value */
+  @Test
+  public void testOverwritePartition() throws IOException {
+    List<String> paths = Arrays.asList(
+        "/path/tableName/data/id=1/file1.orc",
+        "/path/tableName/data/file2.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData.set(0, "1");
+    Map<String, PartitionData> pathsWithPartitionData = Maps.newHashMap();
+    paths.forEach(path -> pathsWithPartitionData.put(path, partitionData));
+
+    addPartitionDataFiles(table, createDataFiles(pathsWithPartitionData));
+
+    IcebergTable icebergTable = new IcebergTable(tableId,
+        catalog.newTableOps(tableId),
+        catalogUri,
+        catalog.loadTable(tableId));
+
+    verifyAnyOrder(paths, 
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths 
should match");
+
+    List<String> paths2 = Arrays.asList(
+        "/path/tableName/data/file3.orc",
+        "/path/tableName/data/id=2/file4.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData2 = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData2.set(0, "2");
+    Map<String, PartitionData> paths2WithPartitionData2 = Maps.newHashMap();
+    paths2.forEach(path -> paths2WithPartitionData2.put(path, partitionData2));
+
+    List<DataFile> partition2DataFiles = 
createDataFiles(paths2WithPartitionData2);
+    // here, since partition data with value 2 doesn't exist yet,
+    // we expect it to get added to the table, w/o changing or deleting any 
other partitions
+    icebergTable.overwritePartition(partition2DataFiles, "id", "2");
+    List<String> expectedPaths2 = new ArrayList<>(paths);
+    expectedPaths2.addAll(paths2);
+    verifyAnyOrder(expectedPaths2, 
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths 
should match");
+
+    List<String> paths3 = Arrays.asList(
+        "/path/tableName/data/id=2/file5.orc",
+        "/path/tableName/data/file6.orc"
+    );
+    // Reusing same partition data to create data file with different paths
+    Map<String, PartitionData> paths3WithPartitionData = Maps.newHashMap();
+    paths3.forEach(path -> paths3WithPartitionData.put(path, partitionData));
+    List<DataFile> partition1NewDataFiles = 
createDataFiles(paths3WithPartitionData);

Review Comment:
   NBD, but for a one-liner:
   ```
   List<DataFile> partition1NewDataFiles = 
createDataFiles(paths3.stream().collect(Collectors.toMap(x -> x, 
partition1Data)));
   ```
   (alternative to `x -> x` is `Function.identity()` - your choice)





Issue Time Tracking
-------------------

    Worklog Id:     (was: 939634)
    Time Spent: 10h 40m  (was: 10.5h)

> Support Partition Based Copy in Iceberg Distcp
> ----------------------------------------------
>
>                 Key: GOBBLIN-2159
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2159
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Vivek Rai
>            Priority: Major
>          Time Spent: 10h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to