[
https://issues.apache.org/jira/browse/GOBBLIN-2159?focusedWorklogId=939634&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-939634
]
ASF GitHub Bot logged work on GOBBLIN-2159:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 23/Oct/24 07:47
Start Date: 23/Oct/24 07:47
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4058:
URL: https://github.com/apache/gobblin/pull/4058#discussion_r1811978448
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java:
##########
@@ -190,8 +165,8 @@ public void testMultipleCopyEntitiesGenerated() throws
IOException {
srcFilePaths.add(SRC_WRITE_LOCATION + "/file4.orc");
srcFilePaths.add(SRC_WRITE_LOCATION + "/file5.orc");
- List<DataFile> srcDataFiles = getDataFiles();
-
Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(srcDataFiles);
+ List<DataFile> mockSrcDataFiles = createDataFileMocks();
+
Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(mockSrcDataFiles);
Review Comment:
this mocking setup repeats several times. could it live in a
`@BeforeMethod`?
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java:
##########
@@ -226,6 +226,90 @@ public void testNewTablePropertiesAreRegistered() throws
Exception {
catalog.dropTable(destTableId);
}
+ /** Verify that getPartitionSpecificDataFiles return datafiles belonging to
the partition defined by predicate */
+ @Test
+ public void testGetPartitionSpecificDataFiles() throws IOException {
+ List<String> paths = Arrays.asList(
+ "/path/tableName/data/id=1/file1.orc",
+ "/path/tableName/data/file3.orc",
+ "/path/tableName/data/id=2/file5.orc",
+ "/path/tableName/data/file4.orc",
+ "/path/tableName/data/id=3/file2.orc"
+ );
+ // Using the schema defined in start of this class
+ PartitionData partitionData = new
PartitionData(icebergPartitionSpec.partitionType());
+ partitionData.set(0, "1");
+ Map<String, PartitionData> pathsWithPartitionData = Maps.newHashMap();
+ paths.forEach(path -> pathsWithPartitionData.put(path, partitionData));
+
+ addPartitionDataFiles(table, createDataFiles(pathsWithPartitionData));
+
+ IcebergTable icebergTable = new IcebergTable(tableId,
+ catalog.newTableOps(tableId),
+ catalogUri,
+ catalog.loadTable(tableId));
+ // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate
class
+ Predicate<StructLike> alwaysTruePredicate = partition -> true;
+ Predicate<StructLike> alwaysFalsePredicate = partition -> false;
+
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(),
5);
+
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(),
0);
+ }
+
+ /** Verify that overwritePartition replace data files belonging to given
partition col and value */
+ @Test
+ public void testOverwritePartition() throws IOException {
+ List<String> paths = Arrays.asList(
+ "/path/tableName/data/id=1/file1.orc",
+ "/path/tableName/data/file2.orc"
+ );
+ // Using the schema defined in start of this class
+ PartitionData partitionData = new
PartitionData(icebergPartitionSpec.partitionType());
Review Comment:
nit: `partition1Data` (esp. since you use it again below w/ `paths3`)
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java:
##########
@@ -183,16 +187,20 @@ private Path addUUIDToPath(String filePathStr) {
return new Path(fileDir, newFileName);
}
- private Map<Path, FileStatus>
getDestFilePathWithSrcFileStatus(List<DataFile> srcDataFiles,
- List<DataFile> destDataFiles, FileSystem fs) throws IOException {
- Map<Path, FileStatus> results = Maps.newHashMap();
- for (int i = 0; i < srcDataFiles.size(); i++) {
- Path srcPath = new Path(srcDataFiles.get(i).path().toString());
- Path destPath = new Path(destDataFiles.get(i).path().toString());
- FileStatus srcFileStatus = fs.getFileStatus(srcPath);
- results.put(destPath, srcFileStatus);
- }
- return results;
+ private Map<Path, FileStatus> calcSrcFileStatusByDestFilePath(Map<Path,
DataFile> destDataFileBySrcPath) {
+ Map<Path, FileStatus> srcFileStatusByDestFilePath = Maps.newHashMap();
+ destDataFileBySrcPath.forEach((srcPath, destDataFile) -> {
+ FileStatus srcFileStatus;
+ try {
+ srcFileStatus = this.sourceFs.getFileStatus(srcPath);
+ } catch (IOException e) {
+ String errMsg = String.format("~%s~ Failed to get file status for path
: %s", this.getFileSetId(), srcPath);
+ log.error(errMsg);
+ throw new RuntimeException(errMsg, e);
+ }
Review Comment:
I **really** wish `java.util.function.*` played along better w/ checked
exceptions... but that's clearly not the case... \*sigh\*
throwing `IOException` is actually a key part of [the `FileSet`
"contract"](https://github.com/apache/gobblin/blob/5b495aff5d5bbe840add9f2db76ca944dffc69aa/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/partition/FileSet.java#L142),
so substituting an unchecked `RuntimeException` (that no caller expects and
would NOT be looking out for) is not something we ought to do at this late
stage.
instead, either write this iteratively (using `for`-each loop) or follow
[`IcebergDataset`'s
use](https://github.com/apache/gobblin/blob/af4b822a1e2f79721cdff617f1581e77ff261580/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java#L205)
of
[`CheckedExceptionFunction.wrapToTunneled`](https://github.com/apache/gobblin/blob/585298fb5ebc074f69c1b9db87de6186c4855b26/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionFunction.java#L69)
```
try {
...
} catch (CheckedExceptionFunction.WrappedIOException wrapper) {
wrapper.rethrowWrapped();
}
```
the code there actually uses:
```
copyConfig.getCopyContext().getFileStatus(targetFs, new
Path(pathStr)).isPresent()
```
for caching, which shouldn't be necessary here, given
`IcebergTable::getPartitionSpecificDataFiles` examines only a single snapshot.
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java:
##########
@@ -75,9 +73,10 @@ public class IcebergPartitionDatasetTest {
private static final String DEST_WRITE_LOCATION = DEST_TEST_DB + "/" +
DEST_TEST_TABLE + "/data";
private static final String TEST_ICEBERG_PARTITION_COLUMN_NAME =
"testPartition";
private static final String TEST_ICEBERG_PARTITION_COLUMN_VALUE =
"testValue";
+ private static final String OVERWRITE_COMMIT_STEP =
"org.apache.gobblin.data.management.copy.iceberg.IcebergOverwritePartitionsStep";
Review Comment:
nit: `IcebergOverwritePartitionsStep.class.getName()`
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java:
##########
@@ -226,6 +226,90 @@ public void testNewTablePropertiesAreRegistered() throws
Exception {
catalog.dropTable(destTableId);
}
+ /** Verify that getPartitionSpecificDataFiles return datafiles belonging to
the partition defined by predicate */
+ @Test
+ public void testGetPartitionSpecificDataFiles() throws IOException {
+ List<String> paths = Arrays.asList(
+ "/path/tableName/data/id=1/file1.orc",
+ "/path/tableName/data/file3.orc",
+ "/path/tableName/data/id=2/file5.orc",
+ "/path/tableName/data/file4.orc",
+ "/path/tableName/data/id=3/file2.orc"
+ );
+ // Using the schema defined in start of this class
+ PartitionData partitionData = new
PartitionData(icebergPartitionSpec.partitionType());
+ partitionData.set(0, "1");
+ Map<String, PartitionData> pathsWithPartitionData = Maps.newHashMap();
+ paths.forEach(path -> pathsWithPartitionData.put(path, partitionData));
+
+ addPartitionDataFiles(table, createDataFiles(pathsWithPartitionData));
+
+ IcebergTable icebergTable = new IcebergTable(tableId,
+ catalog.newTableOps(tableId),
+ catalogUri,
+ catalog.loadTable(tableId));
+ // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate
class
+ Predicate<StructLike> alwaysTruePredicate = partition -> true;
+ Predicate<StructLike> alwaysFalsePredicate = partition -> false;
+
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(),
5);
+
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(),
0);
+ }
+
+ /** Verify that overwritePartition replace data files belonging to given
partition col and value */
+ @Test
+ public void testOverwritePartition() throws IOException {
+ List<String> paths = Arrays.asList(
+ "/path/tableName/data/id=1/file1.orc",
+ "/path/tableName/data/file2.orc"
+ );
+ // Using the schema defined in start of this class
+ PartitionData partitionData = new
PartitionData(icebergPartitionSpec.partitionType());
+ partitionData.set(0, "1");
+ Map<String, PartitionData> pathsWithPartitionData = Maps.newHashMap();
+ paths.forEach(path -> pathsWithPartitionData.put(path, partitionData));
+
+ addPartitionDataFiles(table, createDataFiles(pathsWithPartitionData));
+
+ IcebergTable icebergTable = new IcebergTable(tableId,
+ catalog.newTableOps(tableId),
+ catalogUri,
+ catalog.loadTable(tableId));
+
+ verifyAnyOrder(paths,
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths
should match");
+
+ List<String> paths2 = Arrays.asList(
+ "/path/tableName/data/file3.orc",
+ "/path/tableName/data/id=2/file4.orc"
+ );
+ // Using the schema defined in start of this class
+ PartitionData partitionData2 = new
PartitionData(icebergPartitionSpec.partitionType());
+ partitionData2.set(0, "2");
+ Map<String, PartitionData> paths2WithPartitionData2 = Maps.newHashMap();
+ paths2.forEach(path -> paths2WithPartitionData2.put(path, partitionData2));
+
+ List<DataFile> partition2DataFiles =
createDataFiles(paths2WithPartitionData2);
+ // here, since partition data with value 2 doesn't exist yet,
+ // we expect it to get added to the table, w/o changing or deleting any
other partitions
+ icebergTable.overwritePartition(partition2DataFiles, "id", "2");
+ List<String> expectedPaths2 = new ArrayList<>(paths);
+ expectedPaths2.addAll(paths2);
+ verifyAnyOrder(expectedPaths2,
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths
should match");
+
+ List<String> paths3 = Arrays.asList(
+ "/path/tableName/data/id=2/file5.orc",
Review Comment:
suggest a note that this naming convention is irrelevant to partition
membership, as evidenced by `"/path/tableName/data/id=2/file4.orc"` also in
`paths2`
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java:
##########
@@ -119,11 +117,13 @@ public void testGetCurrentSnapshotInfo() throws
IOException {
}
/** Verify failure when attempting to get current snapshot info for
non-existent table */
- @Test(expectedExceptions = {IcebergTable.TableNotFoundException.class,
NoSuchTableException.class})
+ @Test(expectedExceptions = {IcebergTable.TableNotFoundException.class})
public void testGetCurrentSnapshotInfoOnBogusTable() throws IOException {
TableIdentifier bogusTableId = TableIdentifier.of(dbName, tableName +
"_BOGUS");
+ // Passing null for Table as catalog.loadTable(bogusTableId) will throw
NoSuchTableException so
Review Comment:
typo on the exception class name
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java:
##########
@@ -225,39 +199,57 @@ public void
testWithDifferentSrcAndDestTableWriteLocation() throws IOException {
List<CopyEntity> copyEntities =
(List<CopyEntity>)
icebergPartitionDataset.generateCopyEntities(targetFs, copyConfiguration);
- Assert.assertEquals(copyEntities.size(), 2);
- verifyCopyEntities(copyEntities, false);
+ verifyCopyEntities(copyEntities, 2, false);
}
- private List<DataFile> getDataFiles() throws IOException {
+ private static void setupSrcFileSystem() throws IOException {
+ sourceFs = Mockito.mock(FileSystem.class);
+ Mockito.when(sourceFs.getUri()).thenReturn(SRC_FS_URI);
+ Mockito.when(sourceFs.makeQualified(any(Path.class)))
+ .thenAnswer(invocation -> invocation.getArgument(0,
Path.class).makeQualified(SRC_FS_URI, new Path("/")));
+
Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenAnswer(invocation -> {
+ Path path = invocation.getArgument(0, Path.class);
+ Path qualifiedPath = sourceFs.makeQualified(path);
+ return
IcebergDatasetTest.MockFileSystemBuilder.createEmptyFileStatus(qualifiedPath.toString());
+ });
+ }
+
+ private static void setupDestFileSystem() throws IOException {
+ targetFs = Mockito.mock(FileSystem.class);
+ Mockito.when(targetFs.getUri()).thenReturn(DEST_FS_URI);
+ Mockito.when(targetFs.makeQualified(any(Path.class)))
+ .thenAnswer(invocation -> invocation.getArgument(0,
Path.class).makeQualified(DEST_FS_URI, new Path("/")));
+ // Since we are adding UUID to the file name for every file while creating
destination path,
+ // so return file not found exception if trying to find file status on
destination file system
+ Mockito.when(targetFs.getFileStatus(any(Path.class))).thenThrow(new
FileNotFoundException());
+ }
+
+ private static List<DataFile> createDataFileMocks() throws IOException {
List<DataFile> dataFiles = new ArrayList<>();
for (String srcFilePath : srcFilePaths) {
DataFile dataFile = Mockito.mock(DataFile.class);
Path dataFilePath = new Path(srcFilePath);
Path qualifiedPath = sourceFs.makeQualified(dataFilePath);
Mockito.when(dataFile.path()).thenReturn(dataFilePath.toString());
-
Mockito.when(sourceFs.getFileStatus(Mockito.eq(dataFilePath))).thenReturn(getFileStatus(qualifiedPath));
+
Mockito.when(sourceFs.getFileStatus(Mockito.eq(dataFilePath))).thenReturn(
+
IcebergDatasetTest.MockFileSystemBuilder.createEmptyFileStatus(qualifiedPath.toString()));
dataFiles.add(dataFile);
}
return dataFiles;
}
- private static FileStatus getFileStatus(Path path) {
- FileStatus fileStatus = new FileStatus();
- fileStatus.setPath(path);
- return fileStatus;
- }
-
- private static void verifyCopyEntities(Collection<CopyEntity> copyEntities,
boolean sameSrcAndDestWriteLocation) {
+ private static void verifyCopyEntities(Collection<CopyEntity> copyEntities,
int expectedCopyEntitiesSize,
Review Comment:
taking in the expected size is a definite improvement, but let's go further
to also take in the `expectedSrcFilePaths` - please [see my
reply](https://github.com/apache/gobblin/pull/4058#discussion_r1811938323)
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java:
##########
@@ -93,6 +93,7 @@ public class IcebergDatasetTest {
private static final String MANIFEST_PATH_0 = ROOT_PATH +
"metadata/manifest.a";
private static final String MANIFEST_DATA_PATH_0A = ROOT_PATH + "data/p0/a";
private static final String MANIFEST_DATA_PATH_0B = ROOT_PATH + "data/p0/b";
+ private static final String REGISTER_COMMIT_STEP =
"org.apache.gobblin.data.management.copy.iceberg.IcebergRegisterStep";
Review Comment:
`IcebergRegisterStep.class.getName()` will engage the type checker, whereas
a string literal would not
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java:
##########
@@ -226,6 +226,90 @@ public void testNewTablePropertiesAreRegistered() throws
Exception {
catalog.dropTable(destTableId);
}
+ /** Verify that getPartitionSpecificDataFiles return datafiles belonging to
the partition defined by predicate */
+ @Test
+ public void testGetPartitionSpecificDataFiles() throws IOException {
+ List<String> paths = Arrays.asList(
+ "/path/tableName/data/id=1/file1.orc",
+ "/path/tableName/data/file3.orc",
+ "/path/tableName/data/id=2/file5.orc",
+ "/path/tableName/data/file4.orc",
+ "/path/tableName/data/id=3/file2.orc"
+ );
+ // Using the schema defined in start of this class
+ PartitionData partitionData = new
PartitionData(icebergPartitionSpec.partitionType());
+ partitionData.set(0, "1");
+ Map<String, PartitionData> pathsWithPartitionData = Maps.newHashMap();
+ paths.forEach(path -> pathsWithPartitionData.put(path, partitionData));
+
+ addPartitionDataFiles(table, createDataFiles(pathsWithPartitionData));
+
+ IcebergTable icebergTable = new IcebergTable(tableId,
+ catalog.newTableOps(tableId),
+ catalogUri,
+ catalog.loadTable(tableId));
+ // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate
class
+ Predicate<StructLike> alwaysTruePredicate = partition -> true;
+ Predicate<StructLike> alwaysFalsePredicate = partition -> false;
+
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(),
5);
+
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(),
0);
+ }
+
+ /** Verify that overwritePartition replace data files belonging to given
partition col and value */
+ @Test
+ public void testOverwritePartition() throws IOException {
+ List<String> paths = Arrays.asList(
+ "/path/tableName/data/id=1/file1.orc",
+ "/path/tableName/data/file2.orc"
+ );
+ // Using the schema defined in start of this class
+ PartitionData partitionData = new
PartitionData(icebergPartitionSpec.partitionType());
+ partitionData.set(0, "1");
+ Map<String, PartitionData> pathsWithPartitionData = Maps.newHashMap();
+ paths.forEach(path -> pathsWithPartitionData.put(path, partitionData));
+
+ addPartitionDataFiles(table, createDataFiles(pathsWithPartitionData));
+
+ IcebergTable icebergTable = new IcebergTable(tableId,
+ catalog.newTableOps(tableId),
+ catalogUri,
+ catalog.loadTable(tableId));
+
+ verifyAnyOrder(paths,
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths
should match");
+
+ List<String> paths2 = Arrays.asList(
+ "/path/tableName/data/file3.orc",
+ "/path/tableName/data/id=2/file4.orc"
+ );
+ // Using the schema defined in start of this class
+ PartitionData partitionData2 = new
PartitionData(icebergPartitionSpec.partitionType());
+ partitionData2.set(0, "2");
+ Map<String, PartitionData> paths2WithPartitionData2 = Maps.newHashMap();
+ paths2.forEach(path -> paths2WithPartitionData2.put(path, partitionData2));
+
+ List<DataFile> partition2DataFiles =
createDataFiles(paths2WithPartitionData2);
+ // here, since partition data with value 2 doesn't exist yet,
+ // we expect it to get added to the table, w/o changing or deleting any
other partitions
+ icebergTable.overwritePartition(partition2DataFiles, "id", "2");
+ List<String> expectedPaths2 = new ArrayList<>(paths);
+ expectedPaths2.addAll(paths2);
+ verifyAnyOrder(expectedPaths2,
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths
should match");
+
+ List<String> paths3 = Arrays.asList(
+ "/path/tableName/data/id=2/file5.orc",
+ "/path/tableName/data/file6.orc"
+ );
+ // Reusing same partition data to create data file with different paths
+ Map<String, PartitionData> paths3WithPartitionData = Maps.newHashMap();
+ paths3.forEach(path -> paths3WithPartitionData.put(path, partitionData));
+ List<DataFile> partition1NewDataFiles =
createDataFiles(paths3WithPartitionData);
Review Comment:
NBD, but for a one-liner:
```
List<DataFile> partition1NewDataFiles =
createDataFiles(paths3.stream().collect(Collectors.toMap(x -> x,
partition1Data)));
```
(alternative to `x -> x` is `Function.identity()` - your choice)
Issue Time Tracking
-------------------
Worklog Id: (was: 939634)
Time Spent: 10h 40m (was: 10.5h)
> Support Partition Based Copy in Iceberg Distcp
> ----------------------------------------------
>
> Key: GOBBLIN-2159
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2159
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Vivek Rai
> Priority: Major
> Time Spent: 10h 40m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)