[
https://issues.apache.org/jira/browse/GOBBLIN-2159?focusedWorklogId=939406&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-939406
]
ASF GitHub Bot logged work on GOBBLIN-2159:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 22/Oct/24 08:34
Start Date: 22/Oct/24 08:34
Worklog Time Spent: 10m
Work Description: Blazer-007 commented on code in PR #4058:
URL: https://github.com/apache/gobblin/pull/4058#discussion_r1810232322
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyContext;
+import org.apache.gobblin.data.management.copy.PreserveAttributes;
+import
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+
+import static org.mockito.ArgumentMatchers.any;
+
+
+/** Tests for {@link
org.apache.gobblin.data.management.copy.iceberg.IcebergPartitionDataset} */
+public class IcebergPartitionDatasetTest {
+ private IcebergTable srcIcebergTable;
+ private IcebergTable destIcebergTable;
+ private TableMetadata srcTableMetadata;
+ private TableMetadata destTableMetadata;
+ private FileSystem sourceFs;
+ private FileSystem targetFs;
+ private IcebergPartitionDataset icebergPartitionDataset;
+ private MockedStatic<IcebergPartitionFilterPredicateUtil>
icebergPartitionFilterPredicateUtil;
+ private static final String SRC_TEST_DB = "srcTestDB";
+ private static final String SRC_TEST_TABLE = "srcTestTable";
+ private static final String SRC_WRITE_LOCATION = SRC_TEST_DB + "/" +
SRC_TEST_TABLE + "/data";
+ private static final String DEST_TEST_DB = "destTestDB";
+ private static final String DEST_TEST_TABLE = "destTestTable";
+ private static final String DEST_WRITE_LOCATION = DEST_TEST_DB + "/" +
DEST_TEST_TABLE + "/data";
+ private static final String TEST_ICEBERG_PARTITION_COLUMN_NAME =
"testPartition";
+ private static final String TEST_ICEBERG_PARTITION_COLUMN_VALUE =
"testValue";
+ private final Properties copyConfigProperties = new Properties();
+ private final Properties properties = new Properties();
+ private List<String> srcFilePaths;
+
+ private static final URI SRC_FS_URI;
+ private static final URI DEST_FS_URI;
+
+ static {
+ try {
+ SRC_FS_URI = new URI("abc", "the.source.org", "/", null);
+ DEST_FS_URI = new URI("xyz", "the.dest.org", "/", null);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("should not occur!", e);
+ }
+ }
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ setupSrcFileSystem();
+ setupDestFileSystem();
+
+ TableIdentifier tableIdentifier = TableIdentifier.of(SRC_TEST_DB,
SRC_TEST_TABLE);
+
+ srcIcebergTable = Mockito.mock(IcebergTable.class);
+ destIcebergTable = Mockito.mock(IcebergTable.class);
+
+ srcTableMetadata = Mockito.mock(TableMetadata.class);
+ destTableMetadata = Mockito.mock(TableMetadata.class);
+
Mockito.when(destTableMetadata.spec()).thenReturn(Mockito.mock(PartitionSpec.class));
+
+ Mockito.when(srcIcebergTable.getTableId()).thenReturn(tableIdentifier);
+ Mockito.when(destIcebergTable.getTableId()).thenReturn(tableIdentifier);
+
Mockito.when(srcIcebergTable.accessTableMetadata()).thenReturn(srcTableMetadata);
+
Mockito.when(destIcebergTable.accessTableMetadata()).thenReturn(destTableMetadata);
+
Mockito.when(srcIcebergTable.getDatasetDescriptor(Mockito.any())).thenReturn(Mockito.mock(DatasetDescriptor.class));
+
Mockito.when(destIcebergTable.getDatasetDescriptor(Mockito.any())).thenReturn(Mockito.mock(DatasetDescriptor.class));
+
+ icebergPartitionFilterPredicateUtil =
Mockito.mockStatic(IcebergPartitionFilterPredicateUtil.class);
+ icebergPartitionFilterPredicateUtil
+ .when(() ->
IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(Mockito.anyString(),
Mockito.any(), Mockito.any()))
+ .thenReturn(Optional.of(0));
+
+ copyConfigProperties.setProperty("data.publisher.final.dir", "/test");
+ srcFilePaths = new ArrayList<>();
+ }
+
+ @AfterMethod
+ public void cleanUp() {
+ srcFilePaths.clear();
+ icebergPartitionFilterPredicateUtil.close();
+ }
+
+ private void setupSrcFileSystem() throws IOException {
+ sourceFs = Mockito.mock(FileSystem.class);
+ Mockito.when(sourceFs.getUri()).thenReturn(SRC_FS_URI);
+ Mockito.when(sourceFs.makeQualified(any(Path.class)))
+ .thenAnswer(invocation -> invocation.getArgument(0,
Path.class).makeQualified(SRC_FS_URI, new Path("/")));
+
Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenAnswer(invocation -> {
+ Path path = invocation.getArgument(0, Path.class);
+ Path qualifiedPath = sourceFs.makeQualified(path);
+ return getFileStatus(qualifiedPath);
+ });
+ }
+
+ private void setupDestFileSystem() throws IOException {
+ targetFs = Mockito.mock(FileSystem.class);
+ Mockito.when(targetFs.getUri()).thenReturn(DEST_FS_URI);
+ Mockito.when(targetFs.makeQualified(any(Path.class)))
+ .thenAnswer(invocation -> invocation.getArgument(0,
Path.class).makeQualified(DEST_FS_URI, new Path("/")));
+ // Since we are adding UUID to the file name for every file while creating
destination path,
+ // so return file not found exception if trying to find file status on
destination file system
+ Mockito.when(targetFs.getFileStatus(any(Path.class))).thenThrow(new
FileNotFoundException());
+ }
+
+ @Test
+ public void testGenerateCopyEntities() throws IOException {
+ srcFilePaths.add(SRC_WRITE_LOCATION + "/file1.orc");
+ List<DataFile> srcDataFiles = getDataFiles();
+
Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(srcDataFiles);
+
+ icebergPartitionDataset = new TestIcebergPartitionDataset(srcIcebergTable,
destIcebergTable, properties, sourceFs,
+ true);
+
+ CopyConfiguration copyConfiguration =
+ CopyConfiguration.builder(targetFs,
copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString(""))
+ .copyContext(new CopyContext()).build();
+
+ Collection<CopyEntity> copyEntities =
icebergPartitionDataset.generateCopyEntities(targetFs, copyConfiguration);
+
+ Assert.assertEquals(copyEntities.size(), 2);
+ verifyCopyEntities(copyEntities, true);
Review Comment:
I am doing that but just in a different way since we are adding UUID at
runtime so cant have expected path beforehand - please have a look at function
-
```
private static void verifyCopyEntities(Collection<CopyEntity> copyEntities,
boolean sameSrcAndDestWriteLocation) {
String srcWriteLocationStart = SRC_FS_URI + SRC_WRITE_LOCATION;
String destWriteLocationStart = DEST_FS_URI +
(sameSrcAndDestWriteLocation ? SRC_WRITE_LOCATION : DEST_WRITE_LOCATION);
String srcErrorMsg = String.format("Source Location should start with
%s", srcWriteLocationStart);
String destErrorMsg = String.format("Destination Location should start
with %s", destWriteLocationStart);
for (CopyEntity copyEntity : copyEntities) {
String json = copyEntity.toString();
if (isCopyableFile(json)) {
String originFilepath = getOriginFilePathAsStringFromJson(json);
String destFilepath = getDestinationFilePathAsStringFromJson(json);
Assert.assertTrue(originFilepath.startsWith(srcWriteLocationStart),
srcErrorMsg);
Assert.assertTrue(destFilepath.startsWith(destWriteLocationStart),
destErrorMsg);
String originFileName =
originFilepath.substring(srcWriteLocationStart.length() + 1);
String destFileName =
destFilepath.substring(destWriteLocationStart.length() + 1);
Assert.assertTrue(destFileName.endsWith(originFileName), "Incorrect
file name in destination path");
Assert.assertTrue(destFileName.length() > originFileName.length() +
1,
"Destination file name should be longer than source file name as
UUID is appended");
} else{
verifyPostPublishStep(json);
}
}
}
```
Issue Time Tracking
-------------------
Worklog Id: (was: 939406)
Time Spent: 7h 50m (was: 7h 40m)
> Support Partition Based Copy in Iceberg Distcp
> ----------------------------------------------
>
> Key: GOBBLIN-2159
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2159
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Vivek Rai
> Priority: Major
> Time Spent: 7h 50m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)