[ 
https://issues.apache.org/jira/browse/GOBBLIN-1707?focusedWorklogId=811089&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-811089
 ]

ASF GitHub Bot logged work on GOBBLIN-1707:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Sep/22 08:00
            Start Date: 22/Sep/22 08:00
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3564:
URL: https://github.com/apache/gobblin/pull/3564#discussion_r977325546


##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hive.HiveMetastoreTest;
+import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+
+/** Test {@link org.apache.gobblin.data.management.copy.iceberg.IcebergTable} 
*/
+public class IcebergTableTest extends HiveMetastoreTest {
+
+  protected static final org.apache.iceberg.shaded.org.apache.avro.Schema 
avroDataSchema =
+      SchemaBuilder.record("test")
+          .fields()
+          .name("id")
+          .type()
+          .longType()
+          .noDefault()
+          .endRecord();
+  protected static final Schema icebergSchema = 
AvroSchemaUtil.toIceberg(avroDataSchema);
+  protected static final PartitionSpec icebergPartitionSpec = 
PartitionSpec.builderFor(icebergSchema)
+      .identity("id")
+      .build();
+
+  private final String dbName = "myicebergdb";
+  private final String tableName = "justtesting";
+  private TableIdentifier tableId;
+  private Table table;
+  private String metadataBasePath;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    startMetastore();
+    catalog.createNamespace(Namespace.of(dbName));
+  }
+
+  @BeforeMethod
+  public void setUpEachTest() {
+    tableId = TableIdentifier.of(dbName, tableName);
+    table = catalog.createTable(tableId, icebergSchema);
+    metadataBasePath = calcMetadataBasePath(tableId);
+  }
+
+  @AfterMethod
+  public void cleanUpEachTest() {
+    catalog.dropTable(tableId);
+  }
+
+  /** Verify info about the current snapshot only */
+  @Test
+  public void testGetCurrentSnapshotInfo() throws IOException {
+    List<List<String>> perSnapshotFilesets = Lists.newArrayList(
+        Lists.newArrayList("/path/to/data-a0.orc"),
+        Lists.newArrayList("/path/to/data-b0.orc", "/path/to/data-b1.orc"),
+        Lists.newArrayList("/path/to/data-c0.orc", "/path/to/data-c1.orc", 
"/path/to/data-c2.orc"),
+        Lists.newArrayList("/path/to/data-d0.orc")
+    );
+
+    initializeSnapshots(table, perSnapshotFilesets);
+    IcebergSnapshotInfo snapshotInfo = new 
IcebergTable(catalog.newTableOps(tableId)).getCurrentSnapshotInfo();
+
+    // verify metadata file
+    Optional<File> optMetadataFile = 
extractSomeMetadataFilepath(snapshotInfo.getMetadataPath(), metadataBasePath, 
IcebergTableTest::doesResembleMetadataFilename);
+    Assert.assertTrue(optMetadataFile.isPresent(), "has metadata filepath");
+    verifyMetadataFile(optMetadataFile.get(), 
Optional.of(perSnapshotFilesets.size()));
+    // verify manifest list file
+    Optional<File> optManifestListFile = 
extractSomeMetadataFilepath(snapshotInfo.getManifestListPath(), 
metadataBasePath, IcebergTableTest::doesResembleManifestListFilename);
+    Assert.assertTrue(optManifestListFile.isPresent(), "has manifest list 
filepath");
+    verifyManifestListFile(optManifestListFile.get(), 
Optional.of(snapshotInfo.getSnapshotId()));
+    // verify manifest files and their listed data files
+    List<IcebergSnapshotInfo.ManifestFileInfo> manifestFileInfos = 
snapshotInfo.getManifestFiles();
+    verifyManifestFiles(manifestFileInfos, 
snapshotInfo.getManifestFilePaths(), perSnapshotFilesets);
+    verifyAnyOrder(snapshotInfo.getAllDataFilePaths(), 
flatten(perSnapshotFilesets), "data filepaths");
+    // verify all aforementioned paths collectively equal `getAllPaths()`
+    List<String> allPathsExpected = 
Lists.newArrayList(snapshotInfo.getMetadataPath(), 
snapshotInfo.getManifestListPath());
+    allPathsExpected.addAll(snapshotInfo.getManifestFilePaths());
+    allPathsExpected.addAll(snapshotInfo.getAllDataFilePaths());
+    verifyAnyOrder(snapshotInfo.getAllPaths(), allPathsExpected, "all paths, 
metadata and data");
+  }
+
+  protected String calcMetadataBasePath(TableIdentifier tableId) {
+    return calcMetadataBasePath(tableId.namespace().toString(), 
tableId.name());
+  }
+
+  protected String calcMetadataBasePath(String theDbName, String theTableName) 
{
+    String basePath = String.format("%s/%s/metadata", 
metastore.getDatabasePath(theDbName), theTableName);
+    System.err.println("calculated metadata base path: '" + basePath + "'");
+    return basePath;
+  }
+
+  /** Add one snapshot per sub-list of `perSnapshotFilesets`, in order, with 
the sub-list contents as its data files */
+  protected static void initializeSnapshots(Table table, List<List<String>> 
perSnapshotFilesets) {
+    for (List<String> snapshotFileset : perSnapshotFilesets) {
+      AppendFiles a = table.newAppend();

Review Comment:
   sure, I updated to `append`





Issue Time Tracking
-------------------

            Worklog Id:     (was: 811089)
    Remaining Estimate: 0h
            Time Spent: 10m

> Add Iceberg support to DistCp
> -----------------------------
>
>                 Key: GOBBLIN-1707
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1707
>             Project: Apache Gobblin
>          Issue Type: Task
>          Components: gobblin-core
>            Reporter: Kip Kohn
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Add capability for iceberg copy/replication to distcp.  Support incremental 
> copy (only of delta changes since last time) in addition to full copy on 
> first time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to