This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new ba655b7  Spark: Add duplicate file check in add_files (#2779)
ba655b7 is described below

commit ba655b7180dbfc89783f35ba13ee771ce9050598
Author: Szehon Ho <[email protected]>
AuthorDate: Thu Aug 12 19:15:08 2021 -0700

    Spark: Add duplicate file check in add_files (#2779)
    
    * Spark : Add duplicate file check in add_files
    
    Adds a check that no files being added to a table with add_files already 
exist within the table being added to.
---
 .../org/apache/iceberg/spark/SparkTableUtil.java   | 130 +++++++++++++++++--
 .../iceberg/spark/actions/BaseSparkAction.java     |  44 +------
 .../spark/extensions/TestAddFilesProcedure.java    | 140 ++++++++++++++++++++-
 .../spark/procedures/AddFilesProcedure.java        |  38 ++++--
 4 files changed, 289 insertions(+), 63 deletions(-)

diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java 
b/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index e822d45..c37d290 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -36,10 +36,12 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.MetricsConfig;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.common.DynMethods;
 import org.apache.iceberg.data.TableMigrationUtil;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.hadoop.SerializableConfiguration;
@@ -54,6 +56,7 @@ import 
org.apache.iceberg.relocated.com.google.common.base.Objects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
 import org.apache.spark.TaskContext;
@@ -63,6 +66,8 @@ import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.api.java.function.MapPartitionsFunction;
 import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrameReader;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
@@ -78,6 +83,8 @@ import org.apache.spark.sql.catalyst.expressions.Expression;
 import org.apache.spark.sql.catalyst.expressions.NamedExpression;
 import org.apache.spark.sql.catalyst.parser.ParseException;
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.Function2;
 import scala.Option;
 import scala.Predef;
@@ -95,11 +102,19 @@ import static org.apache.spark.sql.functions.col;
  */
 public class SparkTableUtil {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkTableUtil.class);
+
   private static final Joiner.MapJoiner MAP_JOINER = 
Joiner.on(",").withKeyValueSeparator("=");
 
   private static final PathFilter HIDDEN_PATH_FILTER =
       p -> !p.getName().startsWith("_") && !p.getName().startsWith(".");
 
+  private static final String duplicateFileMessage = "Cannot complete import 
because data files " +
+      "to be imported already exist within the target table: %s.  " +
+      "This is disabled by default as Iceberg is not designed for mulitple 
references to the same file" +
+      " within the same table.  If you are sure, you may set 
'check_duplicate_files' to false to force the import.";
+
+
   private SparkTableUtil() {
   }
 
@@ -364,9 +379,11 @@ public class SparkTableUtil {
    * @param targetTable an Iceberg table where to import the data
    * @param stagingDir a staging directory to store temporary manifest files
    * @param partitionFilter only import partitions whose values match those in 
the map, can be partially defined
+   * @param checkDuplicateFiles if true, throw exception if import results in 
a duplicate data file
    */
   public static void importSparkTable(SparkSession spark, TableIdentifier 
sourceTableIdent, Table targetTable,
-                                      String stagingDir, Map<String, String> 
partitionFilter) {
+                                      String stagingDir, Map<String, String> 
partitionFilter,
+                                      boolean checkDuplicateFiles) {
     SessionCatalog catalog = spark.sessionState().catalog();
 
     String db = sourceTableIdent.database().nonEmpty() ?
@@ -382,13 +399,13 @@ public class SparkTableUtil {
       PartitionSpec spec = SparkSchemaUtil.specForTable(spark, 
sourceTableIdentWithDB.unquotedString());
 
       if (Objects.equal(spec, PartitionSpec.unpartitioned())) {
-        importUnpartitionedSparkTable(spark, sourceTableIdentWithDB, 
targetTable);
+        importUnpartitionedSparkTable(spark, sourceTableIdentWithDB, 
targetTable, checkDuplicateFiles);
       } else {
         List<SparkPartition> sourceTablePartitions = getPartitions(spark, 
sourceTableIdent,
             partitionFilter);
         Preconditions.checkArgument(!sourceTablePartitions.isEmpty(),
             "Cannot find any partitions in table %s", sourceTableIdent);
-        importSparkPartitions(spark, sourceTablePartitions, targetTable, spec, 
stagingDir);
+        importSparkPartitions(spark, sourceTablePartitions, targetTable, spec, 
stagingDir, checkDuplicateFiles);
       }
     } catch (AnalysisException e) {
       throw SparkExceptionUtil.toUncheckedException(
@@ -407,14 +424,31 @@ public class SparkTableUtil {
    * @param sourceTableIdent an identifier of the source Spark table
    * @param targetTable an Iceberg table where to import the data
    * @param stagingDir a staging directory to store temporary manifest files
+   * @param checkDuplicateFiles if true, throw exception if import results in 
a duplicate data file
+   */
+  public static void importSparkTable(SparkSession spark, TableIdentifier 
sourceTableIdent, Table targetTable,
+                                      String stagingDir, boolean 
checkDuplicateFiles) {
+    importSparkTable(spark, sourceTableIdent, targetTable, stagingDir, 
Collections.emptyMap(), checkDuplicateFiles);
+  }
+
+  /**
+   * Import files from an existing Spark table to an Iceberg table.
+   *
+   * The import uses the Spark session to get table metadata. It assumes no
+   * operation is going on the original and target table and thus is not
+   * thread-safe.
+   * @param spark a Spark session
+   * @param sourceTableIdent an identifier of the source Spark table
+   * @param targetTable an Iceberg table where to import the data
+   * @param stagingDir a staging directory to store temporary manifest files
    */
   public static void importSparkTable(SparkSession spark, TableIdentifier 
sourceTableIdent, Table targetTable,
                                       String stagingDir) {
-    importSparkTable(spark, sourceTableIdent, targetTable, stagingDir, 
Collections.emptyMap());
+    importSparkTable(spark, sourceTableIdent, targetTable, stagingDir, 
Collections.emptyMap(), false);
   }
 
   private static void importUnpartitionedSparkTable(SparkSession spark, 
TableIdentifier sourceTableIdent,
-                                                    Table targetTable) {
+                                                    Table targetTable, boolean 
checkDuplicateFiles) {
     try {
       CatalogTable sourceTable = 
spark.sessionState().catalog().getTableMetadata(sourceTableIdent);
       Option<String> format =
@@ -431,6 +465,17 @@ public class SparkTableUtil {
       List<DataFile> files = TableMigrationUtil.listPartition(
           partition, Util.uriToString(sourceTable.location()), format.get(), 
spec, conf, metricsConfig, nameMapping);
 
+      if (checkDuplicateFiles) {
+        Dataset<Row> importedFiles = spark.createDataset(
+            Lists.transform(files, f -> f.path().toString()), 
Encoders.STRING()).toDF("file_path");
+        Dataset<Row> existingFiles = loadMetadataTable(spark, targetTable, 
MetadataTableType.ENTRIES);
+        Column joinCond = 
existingFiles.col("data_file.file_path").equalTo(importedFiles.col("file_path"));
+        Dataset<String> duplicates = importedFiles.join(existingFiles, 
joinCond)
+            .select("file_path").as(Encoders.STRING());
+        Preconditions.checkState(duplicates.isEmpty(),
+            String.format(duplicateFileMessage, Joiner.on(",").join((String[]) 
duplicates.take(10))));
+      }
+
       AppendFiles append = targetTable.newAppend();
       files.forEach(append::appendFile);
       append.commit();
@@ -451,9 +496,10 @@ public class SparkTableUtil {
    * @param targetTable an Iceberg table where to import the data
    * @param spec a partition spec
    * @param stagingDir a staging directory to store temporary manifest files
+   * @param checkDuplicateFiles if true, throw exception if import results in 
a duplicate data file
    */
   public static void importSparkPartitions(SparkSession spark, 
List<SparkPartition> partitions, Table targetTable,
-                                           PartitionSpec spec, String 
stagingDir) {
+                                           PartitionSpec spec, String 
stagingDir, boolean checkDuplicateFiles) {
     Configuration conf = spark.sessionState().newHadoopConf();
     SerializableConfiguration serializableConf = new 
SerializableConfiguration(conf);
     int parallelism = Math.min(partitions.size(), 
spark.sessionState().conf().parallelPartitionDiscoveryParallelism());
@@ -469,10 +515,22 @@ public class SparkTableUtil {
         partitionRDD.rdd(),
         Encoders.javaSerialization(SparkPartition.class));
 
-    List<ManifestFile> manifests = partitionDS
+    Dataset<DataFile> filesToImport = partitionDS
         .flatMap((FlatMapFunction<SparkPartition, DataFile>) sparkPartition ->
                 listPartition(sparkPartition, spec, serializableConf, 
metricsConfig, nameMapping).iterator(),
-            Encoders.javaSerialization(DataFile.class))
+            Encoders.javaSerialization(DataFile.class));
+
+    if (checkDuplicateFiles) {
+      Dataset<Row> importedFiles = filesToImport.map(f -> f.path().toString(), 
Encoders.STRING()).toDF("file_path");
+      Dataset<Row> existingFiles = loadMetadataTable(spark, targetTable, 
MetadataTableType.ENTRIES);
+      Column joinCond = 
existingFiles.col("data_file.file_path").equalTo(importedFiles.col("file_path"));
+      Dataset<String> duplicates = importedFiles.join(existingFiles, joinCond)
+          .select("file_path").as(Encoders.STRING());
+      Preconditions.checkState(duplicates.isEmpty(),
+          String.format(duplicateFileMessage, Joiner.on(",").join((String[]) 
duplicates.take(10))));
+    }
+
+    List<ManifestFile> manifests = filesToImport
         .repartition(numShufflePartitions)
         .map((MapFunction<DataFile, Tuple2<String, DataFile>>) file ->
                 Tuple2.apply(file.path().toString(), file),
@@ -504,6 +562,20 @@ public class SparkTableUtil {
     }
   }
 
+  /**
+   * Import files from given partitions to an Iceberg table.
+   *
+   * @param spark a Spark session
+   * @param partitions partitions to import
+   * @param targetTable an Iceberg table where to import the data
+   * @param spec a partition spec
+   * @param stagingDir a staging directory to store temporary manifest files
+   */
+  public static void importSparkPartitions(SparkSession spark, 
List<SparkPartition> partitions, Table targetTable,
+                                           PartitionSpec spec, String 
stagingDir) {
+    importSparkPartitions(spark, partitions, targetTable, spec, stagingDir, 
false);
+  }
+
   public static List<SparkPartition> filterPartitions(List<SparkPartition> 
partitions,
                                                       Map<String, String> 
partitionFilter) {
     if (partitionFilter.isEmpty()) {
@@ -522,6 +594,48 @@ public class SparkTableUtil {
         .run(item -> io.deleteFile(item.path()));
   }
 
+  // Attempt to use Spark3 Catalog resolution if available on the path
+  private static final DynMethods.UnboundMethod LOAD_CATALOG = 
DynMethods.builder("loadCatalogMetadataTable")
+      .hiddenImpl("org.apache.iceberg.spark.Spark3Util", SparkSession.class, 
String.class, MetadataTableType.class)
+      .orNoop()
+      .build();
+
+  public static Dataset<Row> loadCatalogMetadataTable(SparkSession spark, 
String tableName, MetadataTableType type) {
+    Preconditions.checkArgument(!LOAD_CATALOG.isNoop(), "Cannot find 
Spark3Util class but Spark3 is in use");
+    return LOAD_CATALOG.asStatic().invoke(spark, tableName, type);
+  }
+
+  public static Dataset<Row> loadMetadataTable(SparkSession spark, Table 
table, MetadataTableType type) {
+    String tableName = table.name();
+    String tableLocation = table.location();
+
+    DataFrameReader dataFrameReader = spark.read().format("iceberg");
+    if (tableName.contains("/")) {
+      // Hadoop Table or Metadata location passed, load without a catalog
+      return dataFrameReader.load(tableName + "#" + type);
+    }
+
+    // Try DSV2 catalog based name based resolution
+    if (spark.version().startsWith("3")) {
+      Dataset<Row> catalogMetadataTable = loadCatalogMetadataTable(spark, 
tableName, type);
+      if (catalogMetadataTable != null) {
+        return catalogMetadataTable;
+      }
+    }
+
+    // Catalog based resolution failed, our catalog may be a non-DatasourceV2 
Catalog
+    if (tableName.startsWith("hadoop.")) {
+      // Try loading by location as Hadoop table without Catalog
+      return dataFrameReader.load(tableLocation + "#" + type);
+    } else if (tableName.startsWith("hive")) {
+      // Try loading by name as a Hive table without Catalog
+      return dataFrameReader.load(tableName.replaceFirst("hive\\.", "") + "." 
+ type);
+    } else {
+      throw new IllegalArgumentException(String.format(
+          "Cannot find the metadata table for %s of type %s", tableName, 
type));
+    }
+  }
+
   /**
    * Class representing a table partition.
    */
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java 
b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index 785d9a3..4560b27 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -33,20 +33,18 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.actions.Action;
 import org.apache.iceberg.actions.ManifestFileBean;
-import org.apache.iceberg.common.DynMethods;
 import org.apache.iceberg.io.ClosingIterator;
 import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.JobGroupInfo;
 import org.apache.iceberg.spark.JobGroupUtils;
+import org.apache.iceberg.spark.SparkTableUtil;
 import org.apache.iceberg.spark.SparkUtil;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.sql.DataFrameReader;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
@@ -151,46 +149,8 @@ abstract class BaseSparkAction<ThisT, R> implements 
Action<ThisT, R> {
     return manifestDF.union(otherMetadataFileDF).union(manifestListDF);
   }
 
-  // Attempt to use Spark3 Catalog resolution if available on the path
-  private static final DynMethods.UnboundMethod LOAD_CATALOG = 
DynMethods.builder("loadCatalogMetadataTable")
-      .hiddenImpl("org.apache.iceberg.spark.Spark3Util", SparkSession.class, 
String.class, MetadataTableType.class)
-      .orNoop()
-      .build();
-
-  private Dataset<Row> loadCatalogMetadataTable(String tableName, 
MetadataTableType type) {
-    Preconditions.checkArgument(!LOAD_CATALOG.isNoop(), "Cannot find 
Spark3Util class but Spark3 is in use");
-    return LOAD_CATALOG.asStatic().invoke(spark, tableName, type);
-  }
-
   protected Dataset<Row> loadMetadataTable(Table table, MetadataTableType 
type) {
-    String tableName = table.name();
-    String tableLocation = table.location();
-
-    DataFrameReader dataFrameReader = spark.read().format("iceberg");
-    if (tableName.contains("/")) {
-      // Hadoop Table or Metadata location passed, load without a catalog
-      return dataFrameReader.load(tableName + "#" + type);
-    }
-
-    // Try DSV2 catalog based name based resolution
-    if (spark.version().startsWith("3")) {
-      Dataset<Row> catalogMetadataTable = loadCatalogMetadataTable(tableName, 
type);
-      if (catalogMetadataTable != null) {
-        return catalogMetadataTable;
-      }
-    }
-
-    // Catalog based resolution failed, our catalog may be a non-DatasourceV2 
Catalog
-    if (tableName.startsWith("hadoop.")) {
-      // Try loading by location as Hadoop table without Catalog
-      return dataFrameReader.load(tableLocation + "#" + type);
-    } else if (tableName.startsWith("hive")) {
-      // Try loading by name as a Hive table without Catalog
-      return dataFrameReader.load(tableName.replaceFirst("hive\\.", "") + "." 
+ type);
-    } else {
-      throw new IllegalArgumentException(String.format(
-          "Cannot find the metadata table for %s of type %s", tableName, 
type));
-    }
+    return SparkTableUtil.loadMetadataTable(spark, table, type);
   }
 
   private static class ReadManifest implements 
FlatMapFunction<ManifestFileBean, String> {
diff --git 
a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
 
b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
index dea01a1..50ab8b9 100644
--- 
a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
+++ 
b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
@@ -444,6 +444,145 @@ public class TestAddFilesProcedure extends 
SparkExtensionsTestBase {
             catalogName, tableName, fileTableDir.getAbsolutePath()));
   }
 
+
+  @Test
+  public void addTwice() {
+    createPartitionedHiveTable();
+
+    String createIceberg =
+        "CREATE TABLE %s (id Integer, name String, dept String, subdept 
String) USING iceberg PARTITIONED BY (id)";
+
+    sql(createIceberg, tableName);
+
+    Object result1 = scalarSql("CALL %s.system.add_files(" +
+            "table => '%s', " +
+            "source_table => '%s', " +
+            "partition_filter => map('id', 1))",
+        catalogName, tableName, sourceTableName);
+    Assert.assertEquals(2L, result1);
+
+    Object result2 = scalarSql("CALL %s.system.add_files(" +
+            "table => '%s', " +
+            "source_table => '%s', " +
+            "partition_filter => map('id', 2))",
+        catalogName, tableName, sourceTableName);
+    Assert.assertEquals(2L, result2);
+
+    assertEquals("Iceberg table contains correct data",
+        sql("SELECT id, name, dept, subdept FROM %s WHERE id = 1 ORDER BY id", 
sourceTableName),
+        sql("SELECT id, name, dept, subdept FROM %s WHERE id = 1 ORDER BY id", 
tableName));
+    assertEquals("Iceberg table contains correct data",
+        sql("SELECT id, name, dept, subdept FROM %s WHERE id = 2 ORDER BY id", 
sourceTableName),
+        sql("SELECT id, name, dept, subdept FROM %s WHERE id = 2 ORDER BY id", 
tableName));
+  }
+
+  @Test
+  public void duplicateDataPartitioned() {
+    createPartitionedHiveTable();
+
+    String createIceberg =
+        "CREATE TABLE %s (id Integer, name String, dept String, subdept 
String) USING iceberg PARTITIONED BY (id)";
+
+    sql(createIceberg, tableName);
+
+    scalarSql("CALL %s.system.add_files(" +
+            "table => '%s', " +
+            "source_table => '%s', " +
+            "partition_filter => map('id', 1))",
+        catalogName, tableName, sourceTableName);
+
+    AssertHelpers.assertThrows("Should not allow adding duplicate files",
+        IllegalStateException.class,
+        "Cannot complete import because data files to be imported already" +
+            " exist within the target table",
+        () -> scalarSql("CALL %s.system.add_files(" +
+            "table => '%s', " +
+            "source_table => '%s', " +
+            "partition_filter => map('id', 1))",
+        catalogName, tableName, sourceTableName));
+  }
+
+  @Test
+  public void duplicateDataPartitionedAllowed() {
+    createPartitionedHiveTable();
+
+    String createIceberg =
+        "CREATE TABLE %s (id Integer, name String, dept String, subdept 
String) USING iceberg PARTITIONED BY (id)";
+
+    sql(createIceberg, tableName);
+
+    Object result1 = scalarSql("CALL %s.system.add_files(" +
+            "table => '%s', " +
+            "source_table => '%s', " +
+            "partition_filter => map('id', 1))",
+        catalogName, tableName, sourceTableName);
+
+    Assert.assertEquals(2L, result1);
+
+    Object result2 = scalarSql("CALL %s.system.add_files(" +
+            "table => '%s', " +
+            "source_table => '%s', " +
+            "partition_filter => map('id', 1)," +
+            "check_duplicate_files => false)",
+        catalogName, tableName, sourceTableName);
+
+    Assert.assertEquals(2L, result2);
+
+
+    assertEquals("Iceberg table contains correct data",
+        sql("SELECT id, name, dept, subdept FROM %s WHERE id = 1 UNION ALL " +
+            "SELECT id, name, dept, subdept FROM %s WHERE id = 1", 
sourceTableName, sourceTableName),
+        sql("SELECT id, name, dept, subdept FROM %s", tableName, tableName));
+  }
+
+  @Test
+  public void duplicateDataUnpartitioned() {
+    createUnpartitionedHiveTable();
+
+    String createIceberg =
+        "CREATE TABLE %s (id Integer, name String, dept String, subdept 
String) USING iceberg";
+
+    sql(createIceberg, tableName);
+
+    scalarSql("CALL %s.system.add_files('%s', '%s')",
+        catalogName, tableName, sourceTableName);
+
+    AssertHelpers.assertThrows("Should not allow adding duplicate files",
+        IllegalStateException.class,
+        "Cannot complete import because data files to be imported already" +
+            " exist within the target table",
+        () -> scalarSql("CALL %s.system.add_files('%s', '%s')",
+            catalogName, tableName, sourceTableName));
+  }
+
+  @Test
+  public void duplicateDataUnpartitionedAllowed() {
+    createUnpartitionedHiveTable();
+
+    String createIceberg =
+        "CREATE TABLE %s (id Integer, name String, dept String, subdept 
String) USING iceberg";
+
+    sql(createIceberg, tableName);
+
+    Object result1 = scalarSql("CALL %s.system.add_files('%s', '%s')",
+        catalogName, tableName, sourceTableName);
+    Assert.assertEquals(2L, result1);
+
+    Object result2 = scalarSql("CALL %s.system.add_files(" +
+        "table => '%s', " +
+            "source_table => '%s'," +
+            "check_duplicate_files => false)",
+        catalogName, tableName, sourceTableName);
+    Assert.assertEquals(2L, result2);
+
+    assertEquals("Iceberg table contains correct data",
+        sql("SELECT * FROM (SELECT * FROM %s UNION ALL " +
+            "SELECT * from %s) ORDER BY id", sourceTableName, sourceTableName),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+
+  }
+
   private static final StructField[] struct = {
       new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
       new StructField("name", DataTypes.StringType, false, Metadata.empty()),
@@ -463,7 +602,6 @@ public class TestAddFilesProcedure extends 
SparkExtensionsTestBase {
   private static final Dataset<Row> partitionedDF =
       unpartitionedDF.select("name", "dept", "subdept", "id");
 
-
   private static final Dataset<Row> compositePartitionedDF =
       unpartitionedDF.select("name", "subdept", "id", "dept");
 
diff --git 
a/spark3/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java
 
b/spark3/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java
index 4969962..a6802af 100644
--- 
a/spark3/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java
+++ 
b/spark3/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java
@@ -59,7 +59,8 @@ class AddFilesProcedure extends BaseProcedure {
   private static final ProcedureParameter[] PARAMETERS = new 
ProcedureParameter[]{
       ProcedureParameter.required("table", DataTypes.StringType),
       ProcedureParameter.required("source_table", DataTypes.StringType),
-      ProcedureParameter.optional("partition_filter", STRING_MAP)
+      ProcedureParameter.optional("partition_filter", STRING_MAP),
+      ProcedureParameter.optional("check_duplicate_files", 
DataTypes.BooleanType)
   };
 
   private static final StructType OUTPUT_TYPE = new StructType(new 
StructField[]{
@@ -105,7 +106,14 @@ class AddFilesProcedure extends BaseProcedure {
           });
     }
 
-    long addedFilesCount = importToIceberg(tableIdent, sourceIdent, 
partitionFilter);
+    boolean checkDuplicateFiles;
+    if (args.isNullAt(3)) {
+      checkDuplicateFiles = true;
+    } else {
+      checkDuplicateFiles = args.getBoolean(3);
+    }
+
+    long addedFilesCount = importToIceberg(tableIdent, sourceIdent, 
partitionFilter, checkDuplicateFiles);
     return new InternalRow[]{newInternalRow(addedFilesCount)};
   }
 
@@ -117,7 +125,8 @@ class AddFilesProcedure extends BaseProcedure {
             namespace[0].equalsIgnoreCase("avro"));
   }
 
-  private long importToIceberg(Identifier destIdent, Identifier sourceIdent, 
Map<String, String> partitionFilter) {
+  private long importToIceberg(Identifier destIdent, Identifier sourceIdent, 
Map<String, String> partitionFilter,
+                               boolean checkDuplicateFiles) {
     return modifyIcebergTable(destIdent, table -> {
 
       validatePartitionSpec(table, partitionFilter);
@@ -126,9 +135,9 @@ class AddFilesProcedure extends BaseProcedure {
       if (isFileIdentifier(sourceIdent)) {
         Path sourcePath = new Path(sourceIdent.name());
         String format = sourceIdent.namespace()[0];
-        importFileTable(table, sourcePath, format, partitionFilter);
+        importFileTable(table, sourcePath, format, partitionFilter, 
checkDuplicateFiles);
       } else {
-        importCatalogTable(table, sourceIdent, partitionFilter);
+        importCatalogTable(table, sourceIdent, partitionFilter, 
checkDuplicateFiles);
       }
 
       Snapshot snapshot = table.currentSnapshot();
@@ -147,7 +156,8 @@ class AddFilesProcedure extends BaseProcedure {
     }
   }
 
-  private void importFileTable(Table table, Path tableLocation, String format, 
Map<String, String> partitionFilter) {
+  private void importFileTable(Table table, Path tableLocation, String format, 
Map<String, String> partitionFilter,
+                               boolean checkDuplicateFiles) {
     // List Partitions via Spark InMemory file search interface
     List<SparkPartition> partitions = Spark3Util.getPartitions(spark(), 
tableLocation, format);
 
@@ -158,7 +168,7 @@ class AddFilesProcedure extends BaseProcedure {
 
       // Build a Global Partition for the source
       SparkPartition partition = new SparkPartition(Collections.emptyMap(), 
tableLocation.toString(), format);
-      importPartitions(table, ImmutableList.of(partition));
+      importPartitions(table, ImmutableList.of(partition), 
checkDuplicateFiles);
     } else {
       Preconditions.checkArgument(!partitions.isEmpty(),
           "Cannot find any partitions in table %s", partitions);
@@ -166,19 +176,23 @@ class AddFilesProcedure extends BaseProcedure {
       Preconditions.checkArgument(!filteredPartitions.isEmpty(),
           "Cannot find any partitions which match the given filter. Partition 
filter is %s",
           MAP_JOINER.join(partitionFilter));
-      importPartitions(table, filteredPartitions);
+      importPartitions(table, filteredPartitions, checkDuplicateFiles);
     }
   }
 
-  private void importCatalogTable(Table table, Identifier sourceIdent, 
Map<String, String> partitionFilter) {
+  private void importCatalogTable(Table table, Identifier sourceIdent, 
Map<String, String> partitionFilter,
+                                  boolean checkDuplicateFiles) {
     String stagingLocation = getMetadataLocation(table);
     TableIdentifier sourceTableIdentifier = 
Spark3Util.toV1TableIdentifier(sourceIdent);
-    SparkTableUtil.importSparkTable(spark(), sourceTableIdentifier, table, 
stagingLocation, partitionFilter);
+    SparkTableUtil.importSparkTable(spark(), sourceTableIdentifier, table, 
stagingLocation, partitionFilter,
+        checkDuplicateFiles);
   }
 
-  private void importPartitions(Table table, 
List<SparkTableUtil.SparkPartition> partitions) {
+  private void importPartitions(Table table, 
List<SparkTableUtil.SparkPartition> partitions,
+                                boolean checkDuplicateFiles) {
     String stagingLocation = getMetadataLocation(table);
-    SparkTableUtil.importSparkPartitions(spark(), partitions, table, 
table.spec(), stagingLocation);
+    SparkTableUtil.importSparkPartitions(spark(), partitions, table, 
table.spec(), stagingLocation,
+        checkDuplicateFiles);
   }
 
   private String getMetadataLocation(Table table) {

Reply via email to