This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new ba655b7 Spark: Add duplicate file check in add_files (#2779)
ba655b7 is described below
commit ba655b7180dbfc89783f35ba13ee771ce9050598
Author: Szehon Ho <[email protected]>
AuthorDate: Thu Aug 12 19:15:08 2021 -0700
Spark: Add duplicate file check in add_files (#2779)
* Spark : Add duplicate file check in add_files
Adds a check that no files being added to a table with add_files already
exist within the table being added to.
---
.../org/apache/iceberg/spark/SparkTableUtil.java | 130 +++++++++++++++++--
.../iceberg/spark/actions/BaseSparkAction.java | 44 +------
.../spark/extensions/TestAddFilesProcedure.java | 140 ++++++++++++++++++++-
.../spark/procedures/AddFilesProcedure.java | 38 ++++--
4 files changed, 289 insertions(+), 63 deletions(-)
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
b/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index e822d45..c37d290 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -36,10 +36,12 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.data.TableMigrationUtil;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.SerializableConfiguration;
@@ -54,6 +56,7 @@ import
org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.TaskContext;
@@ -63,6 +66,8 @@ import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
@@ -78,6 +83,8 @@ import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.catalyst.parser.ParseException;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Function2;
import scala.Option;
import scala.Predef;
@@ -95,11 +102,19 @@ import static org.apache.spark.sql.functions.col;
*/
public class SparkTableUtil {
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkTableUtil.class);
+
private static final Joiner.MapJoiner MAP_JOINER =
Joiner.on(",").withKeyValueSeparator("=");
private static final PathFilter HIDDEN_PATH_FILTER =
p -> !p.getName().startsWith("_") && !p.getName().startsWith(".");
+ private static final String duplicateFileMessage = "Cannot complete import
because data files " +
+ "to be imported already exist within the target table: %s. " +
+ "This is disabled by default as Iceberg is not designed for mulitple
references to the same file" +
+ " within the same table. If you are sure, you may set
'check_duplicate_files' to false to force the import.";
+
+
private SparkTableUtil() {
}
@@ -364,9 +379,11 @@ public class SparkTableUtil {
* @param targetTable an Iceberg table where to import the data
* @param stagingDir a staging directory to store temporary manifest files
* @param partitionFilter only import partitions whose values match those in
the map, can be partially defined
+ * @param checkDuplicateFiles if true, throw exception if import results in
a duplicate data file
*/
public static void importSparkTable(SparkSession spark, TableIdentifier
sourceTableIdent, Table targetTable,
- String stagingDir, Map<String, String>
partitionFilter) {
+ String stagingDir, Map<String, String>
partitionFilter,
+ boolean checkDuplicateFiles) {
SessionCatalog catalog = spark.sessionState().catalog();
String db = sourceTableIdent.database().nonEmpty() ?
@@ -382,13 +399,13 @@ public class SparkTableUtil {
PartitionSpec spec = SparkSchemaUtil.specForTable(spark,
sourceTableIdentWithDB.unquotedString());
if (Objects.equal(spec, PartitionSpec.unpartitioned())) {
- importUnpartitionedSparkTable(spark, sourceTableIdentWithDB,
targetTable);
+ importUnpartitionedSparkTable(spark, sourceTableIdentWithDB,
targetTable, checkDuplicateFiles);
} else {
List<SparkPartition> sourceTablePartitions = getPartitions(spark,
sourceTableIdent,
partitionFilter);
Preconditions.checkArgument(!sourceTablePartitions.isEmpty(),
"Cannot find any partitions in table %s", sourceTableIdent);
- importSparkPartitions(spark, sourceTablePartitions, targetTable, spec,
stagingDir);
+ importSparkPartitions(spark, sourceTablePartitions, targetTable, spec,
stagingDir, checkDuplicateFiles);
}
} catch (AnalysisException e) {
throw SparkExceptionUtil.toUncheckedException(
@@ -407,14 +424,31 @@ public class SparkTableUtil {
* @param sourceTableIdent an identifier of the source Spark table
* @param targetTable an Iceberg table where to import the data
* @param stagingDir a staging directory to store temporary manifest files
+ * @param checkDuplicateFiles if true, throw exception if import results in
a duplicate data file
+ */
+ public static void importSparkTable(SparkSession spark, TableIdentifier
sourceTableIdent, Table targetTable,
+ String stagingDir, boolean
checkDuplicateFiles) {
+ importSparkTable(spark, sourceTableIdent, targetTable, stagingDir,
Collections.emptyMap(), checkDuplicateFiles);
+ }
+
+ /**
+ * Import files from an existing Spark table to an Iceberg table.
+ *
+ * The import uses the Spark session to get table metadata. It assumes no
+ * operation is going on the original and target table and thus is not
+ * thread-safe.
+ * @param spark a Spark session
+ * @param sourceTableIdent an identifier of the source Spark table
+ * @param targetTable an Iceberg table where to import the data
+ * @param stagingDir a staging directory to store temporary manifest files
*/
public static void importSparkTable(SparkSession spark, TableIdentifier
sourceTableIdent, Table targetTable,
String stagingDir) {
- importSparkTable(spark, sourceTableIdent, targetTable, stagingDir,
Collections.emptyMap());
+ importSparkTable(spark, sourceTableIdent, targetTable, stagingDir,
Collections.emptyMap(), false);
}
private static void importUnpartitionedSparkTable(SparkSession spark,
TableIdentifier sourceTableIdent,
- Table targetTable) {
+ Table targetTable, boolean
checkDuplicateFiles) {
try {
CatalogTable sourceTable =
spark.sessionState().catalog().getTableMetadata(sourceTableIdent);
Option<String> format =
@@ -431,6 +465,17 @@ public class SparkTableUtil {
List<DataFile> files = TableMigrationUtil.listPartition(
partition, Util.uriToString(sourceTable.location()), format.get(),
spec, conf, metricsConfig, nameMapping);
+ if (checkDuplicateFiles) {
+ Dataset<Row> importedFiles = spark.createDataset(
+ Lists.transform(files, f -> f.path().toString()),
Encoders.STRING()).toDF("file_path");
+ Dataset<Row> existingFiles = loadMetadataTable(spark, targetTable,
MetadataTableType.ENTRIES);
+ Column joinCond =
existingFiles.col("data_file.file_path").equalTo(importedFiles.col("file_path"));
+ Dataset<String> duplicates = importedFiles.join(existingFiles,
joinCond)
+ .select("file_path").as(Encoders.STRING());
+ Preconditions.checkState(duplicates.isEmpty(),
+ String.format(duplicateFileMessage, Joiner.on(",").join((String[])
duplicates.take(10))));
+ }
+
AppendFiles append = targetTable.newAppend();
files.forEach(append::appendFile);
append.commit();
@@ -451,9 +496,10 @@ public class SparkTableUtil {
* @param targetTable an Iceberg table where to import the data
* @param spec a partition spec
* @param stagingDir a staging directory to store temporary manifest files
+ * @param checkDuplicateFiles if true, throw exception if import results in
a duplicate data file
*/
public static void importSparkPartitions(SparkSession spark,
List<SparkPartition> partitions, Table targetTable,
- PartitionSpec spec, String
stagingDir) {
+ PartitionSpec spec, String
stagingDir, boolean checkDuplicateFiles) {
Configuration conf = spark.sessionState().newHadoopConf();
SerializableConfiguration serializableConf = new
SerializableConfiguration(conf);
int parallelism = Math.min(partitions.size(),
spark.sessionState().conf().parallelPartitionDiscoveryParallelism());
@@ -469,10 +515,22 @@ public class SparkTableUtil {
partitionRDD.rdd(),
Encoders.javaSerialization(SparkPartition.class));
- List<ManifestFile> manifests = partitionDS
+ Dataset<DataFile> filesToImport = partitionDS
.flatMap((FlatMapFunction<SparkPartition, DataFile>) sparkPartition ->
listPartition(sparkPartition, spec, serializableConf,
metricsConfig, nameMapping).iterator(),
- Encoders.javaSerialization(DataFile.class))
+ Encoders.javaSerialization(DataFile.class));
+
+ if (checkDuplicateFiles) {
+ Dataset<Row> importedFiles = filesToImport.map(f -> f.path().toString(),
Encoders.STRING()).toDF("file_path");
+ Dataset<Row> existingFiles = loadMetadataTable(spark, targetTable,
MetadataTableType.ENTRIES);
+ Column joinCond =
existingFiles.col("data_file.file_path").equalTo(importedFiles.col("file_path"));
+ Dataset<String> duplicates = importedFiles.join(existingFiles, joinCond)
+ .select("file_path").as(Encoders.STRING());
+ Preconditions.checkState(duplicates.isEmpty(),
+ String.format(duplicateFileMessage, Joiner.on(",").join((String[])
duplicates.take(10))));
+ }
+
+ List<ManifestFile> manifests = filesToImport
.repartition(numShufflePartitions)
.map((MapFunction<DataFile, Tuple2<String, DataFile>>) file ->
Tuple2.apply(file.path().toString(), file),
@@ -504,6 +562,20 @@ public class SparkTableUtil {
}
}
+ /**
+ * Import files from given partitions to an Iceberg table.
+ *
+ * @param spark a Spark session
+ * @param partitions partitions to import
+ * @param targetTable an Iceberg table where to import the data
+ * @param spec a partition spec
+ * @param stagingDir a staging directory to store temporary manifest files
+ */
+ public static void importSparkPartitions(SparkSession spark,
List<SparkPartition> partitions, Table targetTable,
+ PartitionSpec spec, String
stagingDir) {
+ importSparkPartitions(spark, partitions, targetTable, spec, stagingDir,
false);
+ }
+
public static List<SparkPartition> filterPartitions(List<SparkPartition>
partitions,
Map<String, String>
partitionFilter) {
if (partitionFilter.isEmpty()) {
@@ -522,6 +594,48 @@ public class SparkTableUtil {
.run(item -> io.deleteFile(item.path()));
}
+ // Attempt to use Spark3 Catalog resolution if available on the path
+ private static final DynMethods.UnboundMethod LOAD_CATALOG =
DynMethods.builder("loadCatalogMetadataTable")
+ .hiddenImpl("org.apache.iceberg.spark.Spark3Util", SparkSession.class,
String.class, MetadataTableType.class)
+ .orNoop()
+ .build();
+
+ public static Dataset<Row> loadCatalogMetadataTable(SparkSession spark,
String tableName, MetadataTableType type) {
+ Preconditions.checkArgument(!LOAD_CATALOG.isNoop(), "Cannot find
Spark3Util class but Spark3 is in use");
+ return LOAD_CATALOG.asStatic().invoke(spark, tableName, type);
+ }
+
+ public static Dataset<Row> loadMetadataTable(SparkSession spark, Table
table, MetadataTableType type) {
+ String tableName = table.name();
+ String tableLocation = table.location();
+
+ DataFrameReader dataFrameReader = spark.read().format("iceberg");
+ if (tableName.contains("/")) {
+ // Hadoop Table or Metadata location passed, load without a catalog
+ return dataFrameReader.load(tableName + "#" + type);
+ }
+
+ // Try DSV2 catalog based name based resolution
+ if (spark.version().startsWith("3")) {
+ Dataset<Row> catalogMetadataTable = loadCatalogMetadataTable(spark,
tableName, type);
+ if (catalogMetadataTable != null) {
+ return catalogMetadataTable;
+ }
+ }
+
+ // Catalog based resolution failed, our catalog may be a non-DatasourceV2
Catalog
+ if (tableName.startsWith("hadoop.")) {
+ // Try loading by location as Hadoop table without Catalog
+ return dataFrameReader.load(tableLocation + "#" + type);
+ } else if (tableName.startsWith("hive")) {
+ // Try loading by name as a Hive table without Catalog
+ return dataFrameReader.load(tableName.replaceFirst("hive\\.", "") + "."
+ type);
+ } else {
+ throw new IllegalArgumentException(String.format(
+ "Cannot find the metadata table for %s of type %s", tableName,
type));
+ }
+ }
+
/**
* Class representing a table partition.
*/
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index 785d9a3..4560b27 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -33,20 +33,18 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.actions.Action;
import org.apache.iceberg.actions.ManifestFileBean;
-import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.io.ClosingIterator;
import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.JobGroupUtils;
+import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
@@ -151,46 +149,8 @@ abstract class BaseSparkAction<ThisT, R> implements
Action<ThisT, R> {
return manifestDF.union(otherMetadataFileDF).union(manifestListDF);
}
- // Attempt to use Spark3 Catalog resolution if available on the path
- private static final DynMethods.UnboundMethod LOAD_CATALOG =
DynMethods.builder("loadCatalogMetadataTable")
- .hiddenImpl("org.apache.iceberg.spark.Spark3Util", SparkSession.class,
String.class, MetadataTableType.class)
- .orNoop()
- .build();
-
- private Dataset<Row> loadCatalogMetadataTable(String tableName,
MetadataTableType type) {
- Preconditions.checkArgument(!LOAD_CATALOG.isNoop(), "Cannot find
Spark3Util class but Spark3 is in use");
- return LOAD_CATALOG.asStatic().invoke(spark, tableName, type);
- }
-
protected Dataset<Row> loadMetadataTable(Table table, MetadataTableType
type) {
- String tableName = table.name();
- String tableLocation = table.location();
-
- DataFrameReader dataFrameReader = spark.read().format("iceberg");
- if (tableName.contains("/")) {
- // Hadoop Table or Metadata location passed, load without a catalog
- return dataFrameReader.load(tableName + "#" + type);
- }
-
- // Try DSV2 catalog based name based resolution
- if (spark.version().startsWith("3")) {
- Dataset<Row> catalogMetadataTable = loadCatalogMetadataTable(tableName,
type);
- if (catalogMetadataTable != null) {
- return catalogMetadataTable;
- }
- }
-
- // Catalog based resolution failed, our catalog may be a non-DatasourceV2
Catalog
- if (tableName.startsWith("hadoop.")) {
- // Try loading by location as Hadoop table without Catalog
- return dataFrameReader.load(tableLocation + "#" + type);
- } else if (tableName.startsWith("hive")) {
- // Try loading by name as a Hive table without Catalog
- return dataFrameReader.load(tableName.replaceFirst("hive\\.", "") + "."
+ type);
- } else {
- throw new IllegalArgumentException(String.format(
- "Cannot find the metadata table for %s of type %s", tableName,
type));
- }
+ return SparkTableUtil.loadMetadataTable(spark, table, type);
}
private static class ReadManifest implements
FlatMapFunction<ManifestFileBean, String> {
diff --git
a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
index dea01a1..50ab8b9 100644
---
a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
+++
b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
@@ -444,6 +444,145 @@ public class TestAddFilesProcedure extends
SparkExtensionsTestBase {
catalogName, tableName, fileTableDir.getAbsolutePath()));
}
+
+ @Test
+ public void addTwice() {
+ createPartitionedHiveTable();
+
+ String createIceberg =
+ "CREATE TABLE %s (id Integer, name String, dept String, subdept
String) USING iceberg PARTITIONED BY (id)";
+
+ sql(createIceberg, tableName);
+
+ Object result1 = scalarSql("CALL %s.system.add_files(" +
+ "table => '%s', " +
+ "source_table => '%s', " +
+ "partition_filter => map('id', 1))",
+ catalogName, tableName, sourceTableName);
+ Assert.assertEquals(2L, result1);
+
+ Object result2 = scalarSql("CALL %s.system.add_files(" +
+ "table => '%s', " +
+ "source_table => '%s', " +
+ "partition_filter => map('id', 2))",
+ catalogName, tableName, sourceTableName);
+ Assert.assertEquals(2L, result2);
+
+ assertEquals("Iceberg table contains correct data",
+ sql("SELECT id, name, dept, subdept FROM %s WHERE id = 1 ORDER BY id",
sourceTableName),
+ sql("SELECT id, name, dept, subdept FROM %s WHERE id = 1 ORDER BY id",
tableName));
+ assertEquals("Iceberg table contains correct data",
+ sql("SELECT id, name, dept, subdept FROM %s WHERE id = 2 ORDER BY id",
sourceTableName),
+ sql("SELECT id, name, dept, subdept FROM %s WHERE id = 2 ORDER BY id",
tableName));
+ }
+
+ @Test
+ public void duplicateDataPartitioned() {
+ createPartitionedHiveTable();
+
+ String createIceberg =
+ "CREATE TABLE %s (id Integer, name String, dept String, subdept
String) USING iceberg PARTITIONED BY (id)";
+
+ sql(createIceberg, tableName);
+
+ scalarSql("CALL %s.system.add_files(" +
+ "table => '%s', " +
+ "source_table => '%s', " +
+ "partition_filter => map('id', 1))",
+ catalogName, tableName, sourceTableName);
+
+ AssertHelpers.assertThrows("Should not allow adding duplicate files",
+ IllegalStateException.class,
+ "Cannot complete import because data files to be imported already" +
+ " exist within the target table",
+ () -> scalarSql("CALL %s.system.add_files(" +
+ "table => '%s', " +
+ "source_table => '%s', " +
+ "partition_filter => map('id', 1))",
+ catalogName, tableName, sourceTableName));
+ }
+
+ @Test
+ public void duplicateDataPartitionedAllowed() {
+ createPartitionedHiveTable();
+
+ String createIceberg =
+ "CREATE TABLE %s (id Integer, name String, dept String, subdept
String) USING iceberg PARTITIONED BY (id)";
+
+ sql(createIceberg, tableName);
+
+ Object result1 = scalarSql("CALL %s.system.add_files(" +
+ "table => '%s', " +
+ "source_table => '%s', " +
+ "partition_filter => map('id', 1))",
+ catalogName, tableName, sourceTableName);
+
+ Assert.assertEquals(2L, result1);
+
+ Object result2 = scalarSql("CALL %s.system.add_files(" +
+ "table => '%s', " +
+ "source_table => '%s', " +
+ "partition_filter => map('id', 1)," +
+ "check_duplicate_files => false)",
+ catalogName, tableName, sourceTableName);
+
+ Assert.assertEquals(2L, result2);
+
+
+ assertEquals("Iceberg table contains correct data",
+ sql("SELECT id, name, dept, subdept FROM %s WHERE id = 1 UNION ALL " +
+ "SELECT id, name, dept, subdept FROM %s WHERE id = 1",
sourceTableName, sourceTableName),
+ sql("SELECT id, name, dept, subdept FROM %s", tableName, tableName));
+ }
+
+ @Test
+ public void duplicateDataUnpartitioned() {
+ createUnpartitionedHiveTable();
+
+ String createIceberg =
+ "CREATE TABLE %s (id Integer, name String, dept String, subdept
String) USING iceberg";
+
+ sql(createIceberg, tableName);
+
+ scalarSql("CALL %s.system.add_files('%s', '%s')",
+ catalogName, tableName, sourceTableName);
+
+ AssertHelpers.assertThrows("Should not allow adding duplicate files",
+ IllegalStateException.class,
+ "Cannot complete import because data files to be imported already" +
+ " exist within the target table",
+ () -> scalarSql("CALL %s.system.add_files('%s', '%s')",
+ catalogName, tableName, sourceTableName));
+ }
+
+ @Test
+ public void duplicateDataUnpartitionedAllowed() {
+ createUnpartitionedHiveTable();
+
+ String createIceberg =
+ "CREATE TABLE %s (id Integer, name String, dept String, subdept
String) USING iceberg";
+
+ sql(createIceberg, tableName);
+
+ Object result1 = scalarSql("CALL %s.system.add_files('%s', '%s')",
+ catalogName, tableName, sourceTableName);
+ Assert.assertEquals(2L, result1);
+
+ Object result2 = scalarSql("CALL %s.system.add_files(" +
+ "table => '%s', " +
+ "source_table => '%s'," +
+ "check_duplicate_files => false)",
+ catalogName, tableName, sourceTableName);
+ Assert.assertEquals(2L, result2);
+
+ assertEquals("Iceberg table contains correct data",
+ sql("SELECT * FROM (SELECT * FROM %s UNION ALL " +
+ "SELECT * from %s) ORDER BY id", sourceTableName, sourceTableName),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+
+
+ }
+
private static final StructField[] struct = {
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("name", DataTypes.StringType, false, Metadata.empty()),
@@ -463,7 +602,6 @@ public class TestAddFilesProcedure extends
SparkExtensionsTestBase {
private static final Dataset<Row> partitionedDF =
unpartitionedDF.select("name", "dept", "subdept", "id");
-
private static final Dataset<Row> compositePartitionedDF =
unpartitionedDF.select("name", "subdept", "id", "dept");
diff --git
a/spark3/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java
b/spark3/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java
index 4969962..a6802af 100644
---
a/spark3/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java
+++
b/spark3/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java
@@ -59,7 +59,8 @@ class AddFilesProcedure extends BaseProcedure {
private static final ProcedureParameter[] PARAMETERS = new
ProcedureParameter[]{
ProcedureParameter.required("table", DataTypes.StringType),
ProcedureParameter.required("source_table", DataTypes.StringType),
- ProcedureParameter.optional("partition_filter", STRING_MAP)
+ ProcedureParameter.optional("partition_filter", STRING_MAP),
+ ProcedureParameter.optional("check_duplicate_files",
DataTypes.BooleanType)
};
private static final StructType OUTPUT_TYPE = new StructType(new
StructField[]{
@@ -105,7 +106,14 @@ class AddFilesProcedure extends BaseProcedure {
});
}
- long addedFilesCount = importToIceberg(tableIdent, sourceIdent,
partitionFilter);
+ boolean checkDuplicateFiles;
+ if (args.isNullAt(3)) {
+ checkDuplicateFiles = true;
+ } else {
+ checkDuplicateFiles = args.getBoolean(3);
+ }
+
+ long addedFilesCount = importToIceberg(tableIdent, sourceIdent,
partitionFilter, checkDuplicateFiles);
return new InternalRow[]{newInternalRow(addedFilesCount)};
}
@@ -117,7 +125,8 @@ class AddFilesProcedure extends BaseProcedure {
namespace[0].equalsIgnoreCase("avro"));
}
- private long importToIceberg(Identifier destIdent, Identifier sourceIdent,
Map<String, String> partitionFilter) {
+ private long importToIceberg(Identifier destIdent, Identifier sourceIdent,
Map<String, String> partitionFilter,
+ boolean checkDuplicateFiles) {
return modifyIcebergTable(destIdent, table -> {
validatePartitionSpec(table, partitionFilter);
@@ -126,9 +135,9 @@ class AddFilesProcedure extends BaseProcedure {
if (isFileIdentifier(sourceIdent)) {
Path sourcePath = new Path(sourceIdent.name());
String format = sourceIdent.namespace()[0];
- importFileTable(table, sourcePath, format, partitionFilter);
+ importFileTable(table, sourcePath, format, partitionFilter,
checkDuplicateFiles);
} else {
- importCatalogTable(table, sourceIdent, partitionFilter);
+ importCatalogTable(table, sourceIdent, partitionFilter,
checkDuplicateFiles);
}
Snapshot snapshot = table.currentSnapshot();
@@ -147,7 +156,8 @@ class AddFilesProcedure extends BaseProcedure {
}
}
- private void importFileTable(Table table, Path tableLocation, String format,
Map<String, String> partitionFilter) {
+ private void importFileTable(Table table, Path tableLocation, String format,
Map<String, String> partitionFilter,
+ boolean checkDuplicateFiles) {
// List Partitions via Spark InMemory file search interface
List<SparkPartition> partitions = Spark3Util.getPartitions(spark(),
tableLocation, format);
@@ -158,7 +168,7 @@ class AddFilesProcedure extends BaseProcedure {
// Build a Global Partition for the source
SparkPartition partition = new SparkPartition(Collections.emptyMap(),
tableLocation.toString(), format);
- importPartitions(table, ImmutableList.of(partition));
+ importPartitions(table, ImmutableList.of(partition),
checkDuplicateFiles);
} else {
Preconditions.checkArgument(!partitions.isEmpty(),
"Cannot find any partitions in table %s", partitions);
@@ -166,19 +176,23 @@ class AddFilesProcedure extends BaseProcedure {
Preconditions.checkArgument(!filteredPartitions.isEmpty(),
"Cannot find any partitions which match the given filter. Partition
filter is %s",
MAP_JOINER.join(partitionFilter));
- importPartitions(table, filteredPartitions);
+ importPartitions(table, filteredPartitions, checkDuplicateFiles);
}
}
- private void importCatalogTable(Table table, Identifier sourceIdent,
Map<String, String> partitionFilter) {
+ private void importCatalogTable(Table table, Identifier sourceIdent,
Map<String, String> partitionFilter,
+ boolean checkDuplicateFiles) {
String stagingLocation = getMetadataLocation(table);
TableIdentifier sourceTableIdentifier =
Spark3Util.toV1TableIdentifier(sourceIdent);
- SparkTableUtil.importSparkTable(spark(), sourceTableIdentifier, table,
stagingLocation, partitionFilter);
+ SparkTableUtil.importSparkTable(spark(), sourceTableIdentifier, table,
stagingLocation, partitionFilter,
+ checkDuplicateFiles);
}
- private void importPartitions(Table table,
List<SparkTableUtil.SparkPartition> partitions) {
+ private void importPartitions(Table table,
List<SparkTableUtil.SparkPartition> partitions,
+ boolean checkDuplicateFiles) {
String stagingLocation = getMetadataLocation(table);
- SparkTableUtil.importSparkPartitions(spark(), partitions, table,
table.spec(), stagingLocation);
+ SparkTableUtil.importSparkPartitions(spark(), partitions, table,
table.spec(), stagingLocation,
+ checkDuplicateFiles);
}
private String getMetadataLocation(Table table) {