[
https://issues.apache.org/jira/browse/GOBBLIN-2159?focusedWorklogId=939320&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-939320
]
ASF GitHub Bot logged work on GOBBLIN-2159:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 21/Oct/24 21:55
Start Date: 21/Oct/24 21:55
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4058:
URL: https://github.com/apache/gobblin/pull/4058#discussion_r1809524465
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java:
##########
@@ -333,4 +348,119 @@ protected static <T> void verifyAnyOrder(Collection<T>
actual, Collection<T> exp
protected static <T, C extends Collection<T>> List<T> flatten(Collection<C>
cc) {
return cc.stream().flatMap(x -> x.stream()).collect(Collectors.toList());
}
+
+ @Test
+ public void testGetPartitionSpecificDataFiles() throws IOException {
+ TableIdentifier testTableId = TableIdentifier.of(dbName,
"testTableForPartitionSpecificDataFiles");
+ Table testTable = catalog.createTable(testTableId, icebergSchema,
icebergPartitionSpec);
+
+ List<String> paths = Arrays.asList(
+ "/path/tableName/data/id=1/file1.orc",
+ "/path/tableName/data/id=1/file3.orc",
+ "/path/tableName/data/id=1/file5.orc",
+ "/path/tableName/data/id=1/file4.orc",
+ "/path/tableName/data/id=1/file2.orc"
+ );
+ // Using the schema defined in start of this class
+ PartitionData partitionData = new
PartitionData(icebergPartitionSpec.partitionType());
+ partitionData.set(0, "1");
+ List<PartitionData> partitionDataList = Collections.nCopies(paths.size(),
partitionData);
+
+ addPartitionDataFiles(testTable, paths, partitionDataList);
+
+ IcebergTable icebergTable = new IcebergTable(testTableId,
+ catalog.newTableOps(testTableId),
+ catalogUri,
+ catalog.loadTable(testTableId));
+ // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate
class
+ Predicate<StructLike> alwaysTruePredicate = partition -> true;
+ Predicate<StructLike> alwaysFalsePredicate = partition -> false;
+
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(),
5);
+
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(),
0);
+
+ catalog.dropTable(testTableId);
+ }
+
+ @Test
+ public void testOverwritePartition() throws IOException {
+ TableIdentifier overwritePartitionTestTableId = TableIdentifier.of(dbName,
"testTableForOverwritePartition");
+ Table testTable = catalog.createTable(overwritePartitionTestTableId,
icebergSchema, icebergPartitionSpec);
+
+ List<String> paths = Arrays.asList(
+ "/path/tableName/data/id=1/file1.orc",
+ "/path/tableName/data/id=1/file2.orc"
+ );
+ // Using the schema defined in start of this class
+ PartitionData partitionData = new
PartitionData(icebergPartitionSpec.partitionType());
+ partitionData.set(0, "1");
+ PartitionData partitionData2 = new
PartitionData(icebergPartitionSpec.partitionType());
+ partitionData2.set(0, "1");
+ List<PartitionData> partitionDataList = Arrays.asList(partitionData,
partitionData2);
+
+ addPartitionDataFiles(testTable, paths, partitionDataList);
+
+ IcebergTable icebergTable = new IcebergTable(overwritePartitionTestTableId,
+ catalog.newTableOps(overwritePartitionTestTableId),
+ catalogUri,
+ catalog.loadTable(overwritePartitionTestTableId));
+
+ verifyAnyOrder(paths,
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths
should match");
+
+ List<String> paths2 = Arrays.asList(
+ "/path/tableName/data/id=2/file3.orc",
+ "/path/tableName/data/id=2/file4.orc"
+ );
+ // Using the schema defined in start of this class
+ PartitionData partitionData3 = new
PartitionData(icebergPartitionSpec.partitionType());
+ partitionData3.set(0, "2");
+ PartitionData partitionData4 = new
PartitionData(icebergPartitionSpec.partitionType());
+ partitionData4.set(0, "2");
+ List<PartitionData> partitionDataList2 = Arrays.asList(partitionData3,
partitionData4);
+
+ List<DataFile> dataFiles2 = getDataFiles(paths2, partitionDataList2);
+ // here, since partition data with value 2 doesn't exist yet, we expect it
to get added to the table
+ icebergTable.overwritePartition(dataFiles2, "id", "2");
+ List<String> expectedPaths2 = new ArrayList<>(paths);
+ expectedPaths2.addAll(paths2);
+ verifyAnyOrder(expectedPaths2,
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths
should match");
+
+ List<String> paths3 = Arrays.asList(
+ "/path/tableName/data/id=1/file5.orc",
+ "/path/tableName/data/id=1/file6.orc"
+ );
+ // Reusing same partition dats to create data file with different paths
+ List<DataFile> dataFiles3 = getDataFiles(paths3, partitionDataList);
+ // here, since partition data with value 1 already exists, we expect it to
get updated in the table with newer path
+ icebergTable.overwritePartition(dataFiles3, "id", "1");
+ List<String> expectedPaths3 = new ArrayList<>(paths2);
+ expectedPaths3.addAll(paths3);
+ verifyAnyOrder(expectedPaths3,
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths
should match");
+
+ catalog.dropTable(overwritePartitionTestTableId);
+ }
+
+ private static void addPartitionDataFiles(Table table, List<String> paths,
List<PartitionData> partitionDataList) {
+ Assert.assertEquals(paths.size(), partitionDataList.size());
+ getDataFiles(paths, partitionDataList).forEach(dataFile ->
table.newAppend().appendFile(dataFile).commit());
+ }
Review Comment:
debatable whether a need for this abstraction, as it's just a `.forEach`
call. the confusing params might initially trick us to believe it's doing more
than that.
the impl should be:
```
void addPartitionDataFiles(Table table, List<DataFile> dataFiles) {
dataFiles.forEach(df -> table.newAppend().appendFile(df).commit());
}
```
then call it as:
```
addPartitionDataFiles(table, createDataFiles(partitionDataByPath))
```
Issue Time Tracking
-------------------
Worklog Id: (was: 939320)
Time Spent: 7h 10m (was: 7h)
> Support Partition Based Copy in Iceberg Distcp
> ----------------------------------------------
>
> Key: GOBBLIN-2159
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2159
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Vivek Rai
> Priority: Major
> Time Spent: 7h 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)