aokolnychyi commented on a change in pull request #2210: URL: https://github.com/apache/iceberg/pull/2210#discussion_r591753843
########## File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.iceberg.spark.procedures; + +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkTableUtil; +import org.apache.iceberg.spark.SparkTableUtil.SparkPartition; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import scala.runtime.BoxedUnit; + +class AddFilesProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{ + ProcedureParameter.required("table", DataTypes.StringType), + ProcedureParameter.required("source_table", DataTypes.StringType), + ProcedureParameter.optional("partition_filter", STRING_MAP) + }; + + private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{ + new StructField("added_files_count", DataTypes.LongType, false, Metadata.empty()) + }); + + private AddFilesProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + public static SparkProcedures.ProcedureBuilder builder() { + return new BaseProcedure.Builder<AddFilesProcedure>() { + @Override + protected AddFilesProcedure doBuild() { + return new AddFilesProcedure(tableCatalog()); + } + }; + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + + CatalogPlugin sessionCat = spark().sessionState().catalogManager().v2SessionCatalog(); + Identifier sourceIdent = toCatalogAndIdentifier(args.getString(1), PARAMETERS[1].name(), sessionCat).identifier(); + + Map<String, String> partitionFilter = Maps.newHashMap(); + if (!args.isNullAt(2)) { + args.getMap(2).foreach(DataTypes.StringType, DataTypes.StringType, + (k, v) -> { + partitionFilter.put(k.toString(), v.toString()); + return BoxedUnit.UNIT; + }); + } + + long addedFilesCount = importToIceberg(tableIdent, sourceIdent, partitionFilter); + return new InternalRow[]{newInternalRow(addedFilesCount)}; + } + + private boolean isFileIdentifier(Identifier ident) { + String[] namespace = ident.namespace(); + return namespace.length == 1 && + (namespace[0].equalsIgnoreCase("orc") || + namespace[0].equalsIgnoreCase("parquet") || + namespace[0].equalsIgnoreCase("avro")); + } + + private long importToIceberg(Identifier destIdent, Identifier sourceIdent, Map<String, String> partitionFilter) { + return modifyIcebergTable(destIdent, table -> { + + validatePartitionSpec(table, partitionFilter); + ensureNameMappingPresent(table); + + if (isFileIdentifier(sourceIdent)) { + Path sourcePath = new Path(sourceIdent.name()); + String format = sourceIdent.namespace()[0]; + importFileTable(table, sourcePath, format, partitionFilter); + } else { + importCatalogTable(table, sourceIdent, partitionFilter); + } + + Snapshot snapshot = table.currentSnapshot(); + return Long.parseLong(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)); + }); + } + + private static void ensureNameMappingPresent(Table table) { Review comment: nit: why static? ########## File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java ########## @@ -0,0 +1,437 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.extensions; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestAddFilesProcedure extends SparkExtensionsTestBase { + + private String sourceTableName = "source_table"; + private File fileTableDir; + + public TestAddFilesProcedure(String catalogName, String implementation, Map<String, String> config) { + super(catalogName, implementation, config); + } + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Before + public void setupTempDirs() { + try { + fileTableDir = temp.newFolder(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @After + public void dropTables() { + sql("DROP TABLE IF EXISTS %s", sourceTableName); + sql("DROP TABLE IF EXISTS %s", sourceTableName); + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void addDataUnpartitioned() { + createUnpartitionedFileTable("parquet"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`')", + catalogName, tableName, fileTableDir.getAbsolutePath()); + + Assert.assertEquals(2L, importOperation); + + assertEquals("Iceberg table contains correct data", + sql("SELECT * FROM %s ORDER BY id", sourceTableName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + + @Ignore // Classpath issues prevent us from actually writing to a Spark ORC table + public void addDataUnpartitionedOrc() { + createUnpartitionedFileTable("orc"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`')", Review comment: Typo? Should be `orc`? ########## File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java ########## @@ -0,0 +1,437 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.extensions; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestAddFilesProcedure extends SparkExtensionsTestBase { + + private String sourceTableName = "source_table"; + private File fileTableDir; + + public TestAddFilesProcedure(String catalogName, String implementation, Map<String, String> config) { + super(catalogName, implementation, config); + } + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Before + public void setupTempDirs() { + try { + fileTableDir = temp.newFolder(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @After + public void dropTables() { + sql("DROP TABLE IF EXISTS %s", sourceTableName); + sql("DROP TABLE IF EXISTS %s", sourceTableName); + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void addDataUnpartitioned() { + createUnpartitionedFileTable("parquet"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`')", + catalogName, tableName, fileTableDir.getAbsolutePath()); + + Assert.assertEquals(2L, importOperation); + + assertEquals("Iceberg table contains correct data", + sql("SELECT * FROM %s ORDER BY id", sourceTableName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + + @Ignore // Classpath issues prevent us from actually writing to a Spark ORC table + public void addDataUnpartitionedOrc() { + createUnpartitionedFileTable("orc"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`')", + catalogName, tableName, fileTableDir.getAbsolutePath()); + + Assert.assertEquals(2L, importOperation); + + assertEquals("Iceberg table contains correct data", + sql("SELECT * FROM %s ORDER BY id", sourceTableName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @Test + public void addDataUnpartitionedHive() { + createUnpartitionedHiveTable(); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '%s')", + catalogName, tableName, sourceTableName); + + Assert.assertEquals(2L, importOperation); + + assertEquals("Iceberg table contains correct data", + sql("SELECT * FROM %s ORDER BY id", sourceTableName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @Test + public void addDataUnpartitionedExtraCol() { + createUnpartitionedFileTable("parquet"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, subdept String, foo string) USING iceberg"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`')", + catalogName, tableName, fileTableDir.getAbsolutePath()); + + Assert.assertEquals(2L, importOperation); + + assertEquals("Iceberg table contains correct data", + sql("SELECT * FROM %s ORDER BY id", sourceTableName), + sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName)); + } + + @Test + public void addDataUnpartitionedMissingCol() { + createUnpartitionedFileTable("parquet"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String) USING iceberg"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`')", + catalogName, tableName, fileTableDir.getAbsolutePath()); + + Assert.assertEquals(2L, importOperation); + + assertEquals("Iceberg table contains correct data", + sql("SELECT id, name, dept FROM %s ORDER BY id", sourceTableName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @Test + public void addDataPartitionedMissingCol() { + createPartitionedFileTable("parquet"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String) USING iceberg PARTITIONED BY (id)"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`')", + catalogName, tableName, fileTableDir.getAbsolutePath()); + + Assert.assertEquals(8L, importOperation); + + assertEquals("Iceberg table contains correct data", + sql("SELECT id, name, dept FROM %s ORDER BY id", sourceTableName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @Test + public void addDataPartitioned() { + createPartitionedFileTable("parquet"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`')", + catalogName, tableName, fileTableDir.getAbsolutePath()); + + Assert.assertEquals(8L, importOperation); + + assertEquals("Iceberg table contains correct data", + sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", sourceTableName), + sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName)); + } + + @Ignore // Classpath issues prevent us from actually writing to a Spark ORC table + public void addDataPartitionedOrc() { + createPartitionedFileTable("orc"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`')", Review comment: Typo? parquet -> orc. ########## File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java ########## @@ -0,0 +1,437 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.extensions; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestAddFilesProcedure extends SparkExtensionsTestBase { + + private String sourceTableName = "source_table"; Review comment: nit: could be final ########## File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.iceberg.spark.procedures; + +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkTableUtil; +import org.apache.iceberg.spark.SparkTableUtil.SparkPartition; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import scala.runtime.BoxedUnit; + +class AddFilesProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{ + ProcedureParameter.required("table", DataTypes.StringType), + ProcedureParameter.required("source_table", DataTypes.StringType), + ProcedureParameter.optional("partition_filter", STRING_MAP) + }; + + private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{ + new StructField("added_files_count", DataTypes.LongType, false, Metadata.empty()) + }); + + private AddFilesProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + public static SparkProcedures.ProcedureBuilder builder() { + return new BaseProcedure.Builder<AddFilesProcedure>() { + @Override + protected AddFilesProcedure doBuild() { + return new AddFilesProcedure(tableCatalog()); + } + }; + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + + CatalogPlugin sessionCat = spark().sessionState().catalogManager().v2SessionCatalog(); + Identifier sourceIdent = toCatalogAndIdentifier(args.getString(1), PARAMETERS[1].name(), sessionCat).identifier(); + + Map<String, String> partitionFilter = Maps.newHashMap(); + if (!args.isNullAt(2)) { + args.getMap(2).foreach(DataTypes.StringType, DataTypes.StringType, + (k, v) -> { + partitionFilter.put(k.toString(), v.toString()); + return BoxedUnit.UNIT; + }); + } + + long addedFilesCount = importToIceberg(tableIdent, sourceIdent, partitionFilter); + return new InternalRow[]{newInternalRow(addedFilesCount)}; + } + + private boolean isFileIdentifier(Identifier ident) { + String[] namespace = ident.namespace(); + return namespace.length == 1 && + (namespace[0].equalsIgnoreCase("orc") || + namespace[0].equalsIgnoreCase("parquet") || + namespace[0].equalsIgnoreCase("avro")); + } + + private long importToIceberg(Identifier destIdent, Identifier sourceIdent, Map<String, String> partitionFilter) { + return modifyIcebergTable(destIdent, table -> { + + validatePartitionSpec(table, partitionFilter); + ensureNameMappingPresent(table); + + if (isFileIdentifier(sourceIdent)) { + Path sourcePath = new Path(sourceIdent.name()); + String format = sourceIdent.namespace()[0]; + importFileTable(table, sourcePath, format, partitionFilter); + } else { + importCatalogTable(table, sourceIdent, partitionFilter); + } + + Snapshot snapshot = table.currentSnapshot(); + return Long.parseLong(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)); + }); + } + + private static void ensureNameMappingPresent(Table table) { + if (table.properties().get(TableProperties.DEFAULT_NAME_MAPPING) == null) { + // Forces Name based resolution instead of position based resolution + NameMapping mapping = MappingUtil.create(table.schema()); + String mappingJson = NameMappingParser.toJson(mapping); + table.updateProperties() + .set(TableProperties.DEFAULT_NAME_MAPPING, mappingJson) + .commit(); + } + } + + private void importFileTable(Table table, Path tableLocation, String format, Map<String, String> partitionFilter) { + // List Partitions via Spark InMemory file search interface + List<SparkPartition> partitions = Spark3Util.getPartitions(spark(), tableLocation, format); + List<SparkPartition> filteredPartitions = SparkTableUtil.filterPartitions(partitions, partitionFilter); + importPartitions(table, filteredPartitions); + } + + private void importCatalogTable(Table table, Identifier sourceIdent, Map<String, String> partitionFilter) { + String stagingLocation = table.properties() + .getOrDefault(TableProperties.WRITE_METADATA_LOCATION, table.location() + "/metadata"); + TableIdentifier sourceTableIdentifier = Spark3Util.toV1TableIdentifier(sourceIdent); + SparkTableUtil.importSparkTable(spark(), sourceTableIdentifier, table, stagingLocation, partitionFilter); + } + + private void importPartitions(Table table, List<SparkTableUtil.SparkPartition> partitions) { + String stagingLocation = table.properties() + .getOrDefault(TableProperties.WRITE_METADATA_LOCATION, table.location() + "/metadata"); + SparkTableUtil.importSparkPartitions(spark(), partitions, table, table.spec(), stagingLocation); + } + + @Override + public String description() { + return null; + } + + private void validatePartitionSpec(Table table, Map<String, String> partitionFilter) { + List<PartitionField> partitionFields = table.spec().fields(); + Set<String> partitionNames = table.spec().fields().stream().map(PartitionField::name).collect(Collectors.toSet()); + + boolean tablePartitioned = !partitionFields.isEmpty(); + boolean partitionSpecPassed = !partitionFilter.isEmpty(); + + if (tablePartitioned && partitionSpecPassed) { + // Not enough partition columns + Preconditions.checkArgument(partitionFields.size() >= partitionFilter.size(), + "Cannot add data files to target table %s because that table is partitioned, " + + "but the number of columns in the provided partition filter (%d) " + + "is greater than the number of partitioned columns in table (%d)", + table.name(), partitionFilter.size(), partitionFields.size()); + + // Check for any non-identity partition columns + List<PartitionField> nonIdentityFields = + partitionFields.stream().filter(x -> !x.transform().isIdentity()).collect(Collectors.toList()); + Preconditions.checkArgument(nonIdentityFields.isEmpty(), + "Cannot add data files to target table %s because that table is partitioned and contains non-identity" + + "partition transforms which will not be compatible. Found non-identity fields %s", + table.name(), nonIdentityFields); + + // Check for any filters of non existent columns + List<String> unMatchedFilters = Review comment: nit: formatting ``` List<String> invalidFilters = partitionFilter.keySet().stream() .filter(filterName -> !partitionNames.contains(filterName)) .collect(Collectors.toList()); ``` ########## File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java ########## @@ -0,0 +1,437 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.extensions; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestAddFilesProcedure extends SparkExtensionsTestBase { + + private String sourceTableName = "source_table"; + private File fileTableDir; + + public TestAddFilesProcedure(String catalogName, String implementation, Map<String, String> config) { + super(catalogName, implementation, config); + } + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Before + public void setupTempDirs() { + try { + fileTableDir = temp.newFolder(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @After + public void dropTables() { + sql("DROP TABLE IF EXISTS %s", sourceTableName); Review comment: Do we need this twice? ########## File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.iceberg.spark.procedures; + +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkTableUtil; +import org.apache.iceberg.spark.SparkTableUtil.SparkPartition; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import scala.runtime.BoxedUnit; + +class AddFilesProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{ + ProcedureParameter.required("table", DataTypes.StringType), + ProcedureParameter.required("source_table", DataTypes.StringType), + ProcedureParameter.optional("partition_filter", STRING_MAP) + }; + + private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{ + new StructField("added_files_count", DataTypes.LongType, false, Metadata.empty()) + }); + + private AddFilesProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + public static SparkProcedures.ProcedureBuilder builder() { + return new BaseProcedure.Builder<AddFilesProcedure>() { + @Override + protected AddFilesProcedure doBuild() { + return new AddFilesProcedure(tableCatalog()); + } + }; + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + + CatalogPlugin sessionCat = spark().sessionState().catalogManager().v2SessionCatalog(); + Identifier sourceIdent = toCatalogAndIdentifier(args.getString(1), PARAMETERS[1].name(), sessionCat).identifier(); + + Map<String, String> partitionFilter = Maps.newHashMap(); + if (!args.isNullAt(2)) { + args.getMap(2).foreach(DataTypes.StringType, DataTypes.StringType, + (k, v) -> { + partitionFilter.put(k.toString(), v.toString()); + return BoxedUnit.UNIT; + }); + } + + long addedFilesCount = importToIceberg(tableIdent, sourceIdent, partitionFilter); + return new InternalRow[]{newInternalRow(addedFilesCount)}; + } + + private boolean isFileIdentifier(Identifier ident) { + String[] namespace = ident.namespace(); + return namespace.length == 1 && + (namespace[0].equalsIgnoreCase("orc") || + namespace[0].equalsIgnoreCase("parquet") || + namespace[0].equalsIgnoreCase("avro")); + } + + private long importToIceberg(Identifier destIdent, Identifier sourceIdent, Map<String, String> partitionFilter) { + return modifyIcebergTable(destIdent, table -> { + + validatePartitionSpec(table, partitionFilter); + ensureNameMappingPresent(table); + + if (isFileIdentifier(sourceIdent)) { + Path sourcePath = new Path(sourceIdent.name()); + String format = sourceIdent.namespace()[0]; + importFileTable(table, sourcePath, format, partitionFilter); + } else { + importCatalogTable(table, sourceIdent, partitionFilter); + } + + Snapshot snapshot = table.currentSnapshot(); + return Long.parseLong(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)); + }); + } + + private static void ensureNameMappingPresent(Table table) { + if (table.properties().get(TableProperties.DEFAULT_NAME_MAPPING) == null) { + // Forces Name based resolution instead of position based resolution + NameMapping mapping = MappingUtil.create(table.schema()); + String mappingJson = NameMappingParser.toJson(mapping); + table.updateProperties() + .set(TableProperties.DEFAULT_NAME_MAPPING, mappingJson) + .commit(); + } + } + + private void importFileTable(Table table, Path tableLocation, String format, Map<String, String> partitionFilter) { + // List Partitions via Spark InMemory file search interface + List<SparkPartition> partitions = Spark3Util.getPartitions(spark(), tableLocation, format); + List<SparkPartition> filteredPartitions = SparkTableUtil.filterPartitions(partitions, partitionFilter); + importPartitions(table, filteredPartitions); + } + + private void importCatalogTable(Table table, Identifier sourceIdent, Map<String, String> partitionFilter) { + String stagingLocation = table.properties() + .getOrDefault(TableProperties.WRITE_METADATA_LOCATION, table.location() + "/metadata"); + TableIdentifier sourceTableIdentifier = Spark3Util.toV1TableIdentifier(sourceIdent); + SparkTableUtil.importSparkTable(spark(), sourceTableIdentifier, table, stagingLocation, partitionFilter); + } + + private void importPartitions(Table table, List<SparkTableUtil.SparkPartition> partitions) { + String stagingLocation = table.properties() + .getOrDefault(TableProperties.WRITE_METADATA_LOCATION, table.location() + "/metadata"); + SparkTableUtil.importSparkPartitions(spark(), partitions, table, table.spec(), stagingLocation); + } + + @Override + public String description() { + return null; + } + + private void validatePartitionSpec(Table table, Map<String, String> partitionFilter) { + List<PartitionField> partitionFields = table.spec().fields(); + Set<String> partitionNames = table.spec().fields().stream().map(PartitionField::name).collect(Collectors.toSet()); + + boolean tablePartitioned = !partitionFields.isEmpty(); + boolean partitionSpecPassed = !partitionFilter.isEmpty(); + + if (tablePartitioned && partitionSpecPassed) { + // Not enough partition columns Review comment: nit: I think this comment can be a bit more precise ########## File path: spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java ########## @@ -621,6 +648,20 @@ public static void importSparkPartitions( } } + public static List<SparkPartition> filterPartitions(List<SparkPartition> partitions, + Map<String, String> partitionFilter) { + if (partitionFilter.isEmpty()) { + return partitions; + + } else { + List<SparkTableUtil.SparkPartition> filteredPartitions = partitions.stream() Review comment: nit: since we removed the code in between, we no longer need the intermediate var. ``` return partitions.stream() .filter(p -> p.getValues().entrySet().containsAll(partitionFilter.entrySet())) .collect(Collectors.toList()); ``` ########## File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.iceberg.spark.procedures; + +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkTableUtil; +import org.apache.iceberg.spark.SparkTableUtil.SparkPartition; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import scala.runtime.BoxedUnit; + +class AddFilesProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{ + ProcedureParameter.required("table", DataTypes.StringType), + ProcedureParameter.required("source_table", DataTypes.StringType), + ProcedureParameter.optional("partition_filter", STRING_MAP) + }; + + private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{ + new StructField("added_files_count", DataTypes.LongType, false, Metadata.empty()) + }); + + private AddFilesProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + public static SparkProcedures.ProcedureBuilder builder() { + return new BaseProcedure.Builder<AddFilesProcedure>() { + @Override + protected AddFilesProcedure doBuild() { + return new AddFilesProcedure(tableCatalog()); + } + }; + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + + CatalogPlugin sessionCat = spark().sessionState().catalogManager().v2SessionCatalog(); + Identifier sourceIdent = toCatalogAndIdentifier(args.getString(1), PARAMETERS[1].name(), sessionCat).identifier(); + + Map<String, String> partitionFilter = Maps.newHashMap(); + if (!args.isNullAt(2)) { + args.getMap(2).foreach(DataTypes.StringType, DataTypes.StringType, + (k, v) -> { + partitionFilter.put(k.toString(), v.toString()); + return BoxedUnit.UNIT; + }); + } + + long addedFilesCount = importToIceberg(tableIdent, sourceIdent, partitionFilter); + return new InternalRow[]{newInternalRow(addedFilesCount)}; + } + + private boolean isFileIdentifier(Identifier ident) { + String[] namespace = ident.namespace(); + return namespace.length == 1 && + (namespace[0].equalsIgnoreCase("orc") || + namespace[0].equalsIgnoreCase("parquet") || + namespace[0].equalsIgnoreCase("avro")); + } + + private long importToIceberg(Identifier destIdent, Identifier sourceIdent, Map<String, String> partitionFilter) { + return modifyIcebergTable(destIdent, table -> { + + validatePartitionSpec(table, partitionFilter); + ensureNameMappingPresent(table); + + if (isFileIdentifier(sourceIdent)) { + Path sourcePath = new Path(sourceIdent.name()); + String format = sourceIdent.namespace()[0]; + importFileTable(table, sourcePath, format, partitionFilter); + } else { + importCatalogTable(table, sourceIdent, partitionFilter); + } + + Snapshot snapshot = table.currentSnapshot(); + return Long.parseLong(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)); + }); + } + + private static void ensureNameMappingPresent(Table table) { + if (table.properties().get(TableProperties.DEFAULT_NAME_MAPPING) == null) { + // Forces Name based resolution instead of position based resolution + NameMapping mapping = MappingUtil.create(table.schema()); + String mappingJson = NameMappingParser.toJson(mapping); + table.updateProperties() + .set(TableProperties.DEFAULT_NAME_MAPPING, mappingJson) + .commit(); + } + } + + private void importFileTable(Table table, Path tableLocation, String format, Map<String, String> partitionFilter) { + // List Partitions via Spark InMemory file search interface + List<SparkPartition> partitions = Spark3Util.getPartitions(spark(), tableLocation, format); + List<SparkPartition> filteredPartitions = SparkTableUtil.filterPartitions(partitions, partitionFilter); + importPartitions(table, filteredPartitions); + } + + private void importCatalogTable(Table table, Identifier sourceIdent, Map<String, String> partitionFilter) { + String stagingLocation = table.properties() + .getOrDefault(TableProperties.WRITE_METADATA_LOCATION, table.location() + "/metadata"); + TableIdentifier sourceTableIdentifier = Spark3Util.toV1TableIdentifier(sourceIdent); + SparkTableUtil.importSparkTable(spark(), sourceTableIdentifier, table, stagingLocation, partitionFilter); + } + + private void importPartitions(Table table, List<SparkTableUtil.SparkPartition> partitions) { + String stagingLocation = table.properties() + .getOrDefault(TableProperties.WRITE_METADATA_LOCATION, table.location() + "/metadata"); + SparkTableUtil.importSparkPartitions(spark(), partitions, table, table.spec(), stagingLocation); + } + + @Override + public String description() { + return null; Review comment: Shall it be "AddFiles" to match other procedures? ########## File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.iceberg.spark.procedures; + +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkTableUtil; +import org.apache.iceberg.spark.SparkTableUtil.SparkPartition; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import scala.runtime.BoxedUnit; + +class AddFilesProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{ + ProcedureParameter.required("table", DataTypes.StringType), + ProcedureParameter.required("source_table", DataTypes.StringType), + ProcedureParameter.optional("partition_filter", STRING_MAP) + }; + + private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{ + new StructField("added_files_count", DataTypes.LongType, false, Metadata.empty()) + }); + + private AddFilesProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + public static SparkProcedures.ProcedureBuilder builder() { + return new BaseProcedure.Builder<AddFilesProcedure>() { + @Override + protected AddFilesProcedure doBuild() { + return new AddFilesProcedure(tableCatalog()); + } + }; + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + + CatalogPlugin sessionCat = spark().sessionState().catalogManager().v2SessionCatalog(); + Identifier sourceIdent = toCatalogAndIdentifier(args.getString(1), PARAMETERS[1].name(), sessionCat).identifier(); + + Map<String, String> partitionFilter = Maps.newHashMap(); + if (!args.isNullAt(2)) { + args.getMap(2).foreach(DataTypes.StringType, DataTypes.StringType, + (k, v) -> { + partitionFilter.put(k.toString(), v.toString()); + return BoxedUnit.UNIT; + }); + } + + long addedFilesCount = importToIceberg(tableIdent, sourceIdent, partitionFilter); + return new InternalRow[]{newInternalRow(addedFilesCount)}; + } + + private boolean isFileIdentifier(Identifier ident) { + String[] namespace = ident.namespace(); + return namespace.length == 1 && + (namespace[0].equalsIgnoreCase("orc") || + namespace[0].equalsIgnoreCase("parquet") || + namespace[0].equalsIgnoreCase("avro")); + } + + private long importToIceberg(Identifier destIdent, Identifier sourceIdent, Map<String, String> partitionFilter) { + return modifyIcebergTable(destIdent, table -> { + + validatePartitionSpec(table, partitionFilter); + ensureNameMappingPresent(table); + + if (isFileIdentifier(sourceIdent)) { + Path sourcePath = new Path(sourceIdent.name()); + String format = sourceIdent.namespace()[0]; + importFileTable(table, sourcePath, format, partitionFilter); + } else { + importCatalogTable(table, sourceIdent, partitionFilter); + } + + Snapshot snapshot = table.currentSnapshot(); + return Long.parseLong(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)); + }); + } + + private static void ensureNameMappingPresent(Table table) { + if (table.properties().get(TableProperties.DEFAULT_NAME_MAPPING) == null) { + // Forces Name based resolution instead of position based resolution + NameMapping mapping = MappingUtil.create(table.schema()); + String mappingJson = NameMappingParser.toJson(mapping); + table.updateProperties() + .set(TableProperties.DEFAULT_NAME_MAPPING, mappingJson) + .commit(); + } + } + + private void importFileTable(Table table, Path tableLocation, String format, Map<String, String> partitionFilter) { + // List Partitions via Spark InMemory file search interface + List<SparkPartition> partitions = Spark3Util.getPartitions(spark(), tableLocation, format); + List<SparkPartition> filteredPartitions = SparkTableUtil.filterPartitions(partitions, partitionFilter); + importPartitions(table, filteredPartitions); + } + + private void importCatalogTable(Table table, Identifier sourceIdent, Map<String, String> partitionFilter) { + String stagingLocation = table.properties() + .getOrDefault(TableProperties.WRITE_METADATA_LOCATION, table.location() + "/metadata"); + TableIdentifier sourceTableIdentifier = Spark3Util.toV1TableIdentifier(sourceIdent); + SparkTableUtil.importSparkTable(spark(), sourceTableIdentifier, table, stagingLocation, partitionFilter); + } + + private void importPartitions(Table table, List<SparkTableUtil.SparkPartition> partitions) { + String stagingLocation = table.properties() + .getOrDefault(TableProperties.WRITE_METADATA_LOCATION, table.location() + "/metadata"); + SparkTableUtil.importSparkPartitions(spark(), partitions, table, table.spec(), stagingLocation); + } + + @Override + public String description() { + return null; + } + + private void validatePartitionSpec(Table table, Map<String, String> partitionFilter) { + List<PartitionField> partitionFields = table.spec().fields(); + Set<String> partitionNames = table.spec().fields().stream().map(PartitionField::name).collect(Collectors.toSet()); + + boolean tablePartitioned = !partitionFields.isEmpty(); + boolean partitionSpecPassed = !partitionFilter.isEmpty(); + + if (tablePartitioned && partitionSpecPassed) { + // Not enough partition columns + Preconditions.checkArgument(partitionFields.size() >= partitionFilter.size(), + "Cannot add data files to target table %s because that table is partitioned, " + + "but the number of columns in the provided partition filter (%d) " + + "is greater than the number of partitioned columns in table (%d)", + table.name(), partitionFilter.size(), partitionFields.size()); + + // Check for any non-identity partition columns + List<PartitionField> nonIdentityFields = + partitionFields.stream().filter(x -> !x.transform().isIdentity()).collect(Collectors.toList()); + Preconditions.checkArgument(nonIdentityFields.isEmpty(), Review comment: Sounds like we should check this irrespectively whether we got a filter or not. Also, I'd format it a bit. ``` List<PartitionField> nonIdentityFields = partitionFields.stream() .filter(x -> !x.transform().isIdentity()) .collect(Collectors.toList()); ``` ########## File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.iceberg.spark.procedures; + +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkTableUtil; +import org.apache.iceberg.spark.SparkTableUtil.SparkPartition; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import scala.runtime.BoxedUnit; + +class AddFilesProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{ + ProcedureParameter.required("table", DataTypes.StringType), + ProcedureParameter.required("source_table", DataTypes.StringType), + ProcedureParameter.optional("partition_filter", STRING_MAP) + }; + + private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{ + new StructField("added_files_count", DataTypes.LongType, false, Metadata.empty()) + }); + + private AddFilesProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + public static SparkProcedures.ProcedureBuilder builder() { + return new BaseProcedure.Builder<AddFilesProcedure>() { + @Override + protected AddFilesProcedure doBuild() { + return new AddFilesProcedure(tableCatalog()); + } + }; + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + + CatalogPlugin sessionCat = spark().sessionState().catalogManager().v2SessionCatalog(); + Identifier sourceIdent = toCatalogAndIdentifier(args.getString(1), PARAMETERS[1].name(), sessionCat).identifier(); + + Map<String, String> partitionFilter = Maps.newHashMap(); + if (!args.isNullAt(2)) { + args.getMap(2).foreach(DataTypes.StringType, DataTypes.StringType, + (k, v) -> { + partitionFilter.put(k.toString(), v.toString()); + return BoxedUnit.UNIT; + }); + } + + long addedFilesCount = importToIceberg(tableIdent, sourceIdent, partitionFilter); + return new InternalRow[]{newInternalRow(addedFilesCount)}; + } + + private boolean isFileIdentifier(Identifier ident) { + String[] namespace = ident.namespace(); + return namespace.length == 1 && + (namespace[0].equalsIgnoreCase("orc") || + namespace[0].equalsIgnoreCase("parquet") || + namespace[0].equalsIgnoreCase("avro")); + } + + private long importToIceberg(Identifier destIdent, Identifier sourceIdent, Map<String, String> partitionFilter) { + return modifyIcebergTable(destIdent, table -> { + + validatePartitionSpec(table, partitionFilter); + ensureNameMappingPresent(table); + + if (isFileIdentifier(sourceIdent)) { + Path sourcePath = new Path(sourceIdent.name()); + String format = sourceIdent.namespace()[0]; + importFileTable(table, sourcePath, format, partitionFilter); + } else { + importCatalogTable(table, sourceIdent, partitionFilter); + } + + Snapshot snapshot = table.currentSnapshot(); + return Long.parseLong(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)); + }); + } + + private static void ensureNameMappingPresent(Table table) { + if (table.properties().get(TableProperties.DEFAULT_NAME_MAPPING) == null) { + // Forces Name based resolution instead of position based resolution + NameMapping mapping = MappingUtil.create(table.schema()); + String mappingJson = NameMappingParser.toJson(mapping); + table.updateProperties() + .set(TableProperties.DEFAULT_NAME_MAPPING, mappingJson) + .commit(); + } + } + + private void importFileTable(Table table, Path tableLocation, String format, Map<String, String> partitionFilter) { + // List Partitions via Spark InMemory file search interface + List<SparkPartition> partitions = Spark3Util.getPartitions(spark(), tableLocation, format); + List<SparkPartition> filteredPartitions = SparkTableUtil.filterPartitions(partitions, partitionFilter); + importPartitions(table, filteredPartitions); + } + + private void importCatalogTable(Table table, Identifier sourceIdent, Map<String, String> partitionFilter) { + String stagingLocation = table.properties() Review comment: nit: should we expose a method for getting the metadata location since we do this in 2 places? ``` private String getMetadataLocation(Table table) { String defaultValue = table.location() + "/metadata"; return table.properties().getOrDefault(TableProperties.WRITE_METADATA_LOCATION, defaultValue); } ``` ########## File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java ########## @@ -0,0 +1,437 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.extensions; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestAddFilesProcedure extends SparkExtensionsTestBase { + + private String sourceTableName = "source_table"; + private File fileTableDir; + + public TestAddFilesProcedure(String catalogName, String implementation, Map<String, String> config) { + super(catalogName, implementation, config); + } + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Before + public void setupTempDirs() { + try { + fileTableDir = temp.newFolder(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @After + public void dropTables() { + sql("DROP TABLE IF EXISTS %s", sourceTableName); + sql("DROP TABLE IF EXISTS %s", sourceTableName); + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void addDataUnpartitioned() { + createUnpartitionedFileTable("parquet"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`')", + catalogName, tableName, fileTableDir.getAbsolutePath()); + + Assert.assertEquals(2L, importOperation); + + assertEquals("Iceberg table contains correct data", + sql("SELECT * FROM %s ORDER BY id", sourceTableName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + + @Ignore // Classpath issues prevent us from actually writing to a Spark ORC table Review comment: Can we add a couple of tests for Avro? ########## File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java ########## @@ -0,0 +1,437 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.extensions; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestAddFilesProcedure extends SparkExtensionsTestBase { + + private String sourceTableName = "source_table"; + private File fileTableDir; + + public TestAddFilesProcedure(String catalogName, String implementation, Map<String, String> config) { + super(catalogName, implementation, config); + } + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Before + public void setupTempDirs() { + try { + fileTableDir = temp.newFolder(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @After + public void dropTables() { + sql("DROP TABLE IF EXISTS %s", sourceTableName); + sql("DROP TABLE IF EXISTS %s", sourceTableName); + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void addDataUnpartitioned() { + createUnpartitionedFileTable("parquet"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`')", + catalogName, tableName, fileTableDir.getAbsolutePath()); + + Assert.assertEquals(2L, importOperation); + + assertEquals("Iceberg table contains correct data", + sql("SELECT * FROM %s ORDER BY id", sourceTableName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + } + Review comment: nit: extra line ########## File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java ########## @@ -0,0 +1,437 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.extensions; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestAddFilesProcedure extends SparkExtensionsTestBase { + + private String sourceTableName = "source_table"; + private File fileTableDir; + + public TestAddFilesProcedure(String catalogName, String implementation, Map<String, String> config) { + super(catalogName, implementation, config); + } + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Before + public void setupTempDirs() { + try { + fileTableDir = temp.newFolder(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @After + public void dropTables() { + sql("DROP TABLE IF EXISTS %s", sourceTableName); + sql("DROP TABLE IF EXISTS %s", sourceTableName); + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void addDataUnpartitioned() { + createUnpartitionedFileTable("parquet"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`')", + catalogName, tableName, fileTableDir.getAbsolutePath()); + + Assert.assertEquals(2L, importOperation); + + assertEquals("Iceberg table contains correct data", + sql("SELECT * FROM %s ORDER BY id", sourceTableName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + + @Ignore // Classpath issues prevent us from actually writing to a Spark ORC table + public void addDataUnpartitionedOrc() { + createUnpartitionedFileTable("orc"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`')", + catalogName, tableName, fileTableDir.getAbsolutePath()); + + Assert.assertEquals(2L, importOperation); + + assertEquals("Iceberg table contains correct data", + sql("SELECT * FROM %s ORDER BY id", sourceTableName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @Test + public void addDataUnpartitionedHive() { + createUnpartitionedHiveTable(); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '%s')", + catalogName, tableName, sourceTableName); + + Assert.assertEquals(2L, importOperation); + + assertEquals("Iceberg table contains correct data", + sql("SELECT * FROM %s ORDER BY id", sourceTableName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @Test + public void addDataUnpartitionedExtraCol() { + createUnpartitionedFileTable("parquet"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, subdept String, foo string) USING iceberg"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`')", + catalogName, tableName, fileTableDir.getAbsolutePath()); + + Assert.assertEquals(2L, importOperation); + + assertEquals("Iceberg table contains correct data", + sql("SELECT * FROM %s ORDER BY id", sourceTableName), + sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName)); + } + + @Test + public void addDataUnpartitionedMissingCol() { + createUnpartitionedFileTable("parquet"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String) USING iceberg"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`')", + catalogName, tableName, fileTableDir.getAbsolutePath()); + + Assert.assertEquals(2L, importOperation); + + assertEquals("Iceberg table contains correct data", + sql("SELECT id, name, dept FROM %s ORDER BY id", sourceTableName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @Test + public void addDataPartitionedMissingCol() { + createPartitionedFileTable("parquet"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String) USING iceberg PARTITIONED BY (id)"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`')", + catalogName, tableName, fileTableDir.getAbsolutePath()); + + Assert.assertEquals(8L, importOperation); + + assertEquals("Iceberg table contains correct data", + sql("SELECT id, name, dept FROM %s ORDER BY id", sourceTableName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @Test + public void addDataPartitioned() { + createPartitionedFileTable("parquet"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`')", + catalogName, tableName, fileTableDir.getAbsolutePath()); + + Assert.assertEquals(8L, importOperation); + + assertEquals("Iceberg table contains correct data", + sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", sourceTableName), + sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName)); + } + + @Ignore // Classpath issues prevent us from actually writing to a Spark ORC table + public void addDataPartitionedOrc() { + createPartitionedFileTable("orc"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`')", + catalogName, tableName, fileTableDir.getAbsolutePath()); + + Assert.assertEquals(8L, importOperation); + + assertEquals("Iceberg table contains correct data", + sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", sourceTableName), + sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName)); + } + + @Test + public void addDataPartitionedHive() { + createPartitionedHiveTable(); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '%s')", + catalogName, tableName, sourceTableName); + + Assert.assertEquals(8L, importOperation); + + assertEquals("Iceberg table contains correct data", + sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", sourceTableName), + sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName)); + } + + @Test + public void addPartitionToPartitioned() { + createPartitionedFileTable("parquet"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 1))", + catalogName, tableName, fileTableDir.getAbsolutePath()); + + Assert.assertEquals(2L, importOperation); + + assertEquals("Iceberg table contains correct data", + sql("SELECT id, name, dept, subdept FROM %s WHERE id = 1 ORDER BY id", sourceTableName), + sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName)); + } + + @Test + public void addFilteredPartitionsToPartitioned() { + createCompositePartitionedTable("parquet"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg " + + "PARTITIONED BY (id, dept)"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 1))", + catalogName, tableName, fileTableDir.getAbsolutePath()); + + Assert.assertEquals(2L, importOperation); + + assertEquals("Iceberg table contains correct data", + sql("SELECT id, name, dept, subdept FROM %s WHERE id = 1 ORDER BY id", sourceTableName), + sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName)); + } + + @Test + public void addPartitionToPartitionedHive() { + createPartitionedHiveTable(); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '%s', map('id', 1))", + catalogName, tableName, sourceTableName); + + Assert.assertEquals(2L, importOperation); + + assertEquals("Iceberg table contains correct data", + sql("SELECT id, name, dept, subdept FROM %s WHERE id = 1 ORDER BY id", sourceTableName), + sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName)); + } + + @Test + public void invalidDataImport() { + createPartitionedFileTable("parquet"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; + + sql(createIceberg, tableName); + + AssertHelpers.assertThrows("Should forbid adding of partitioned data to unpartitioned table", Review comment: Should we test the scenario without the partition filter too? ########## File path: spark3/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.iceberg.spark.procedures; + +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkTableUtil; +import org.apache.iceberg.spark.SparkTableUtil.SparkPartition; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import scala.runtime.BoxedUnit; + +class AddFilesProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{ + ProcedureParameter.required("table", DataTypes.StringType), + ProcedureParameter.required("source_table", DataTypes.StringType), + ProcedureParameter.optional("partition_filter", STRING_MAP) + }; + + private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{ + new StructField("added_files_count", DataTypes.LongType, false, Metadata.empty()) + }); + + private AddFilesProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + public static SparkProcedures.ProcedureBuilder builder() { + return new BaseProcedure.Builder<AddFilesProcedure>() { + @Override + protected AddFilesProcedure doBuild() { + return new AddFilesProcedure(tableCatalog()); + } + }; + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + + CatalogPlugin sessionCat = spark().sessionState().catalogManager().v2SessionCatalog(); + Identifier sourceIdent = toCatalogAndIdentifier(args.getString(1), PARAMETERS[1].name(), sessionCat).identifier(); + + Map<String, String> partitionFilter = Maps.newHashMap(); + if (!args.isNullAt(2)) { + args.getMap(2).foreach(DataTypes.StringType, DataTypes.StringType, + (k, v) -> { + partitionFilter.put(k.toString(), v.toString()); + return BoxedUnit.UNIT; + }); + } + + long addedFilesCount = importToIceberg(tableIdent, sourceIdent, partitionFilter); + return new InternalRow[]{newInternalRow(addedFilesCount)}; + } + + private boolean isFileIdentifier(Identifier ident) { + String[] namespace = ident.namespace(); + return namespace.length == 1 && + (namespace[0].equalsIgnoreCase("orc") || + namespace[0].equalsIgnoreCase("parquet") || + namespace[0].equalsIgnoreCase("avro")); + } + + private long importToIceberg(Identifier destIdent, Identifier sourceIdent, Map<String, String> partitionFilter) { + return modifyIcebergTable(destIdent, table -> { + + validatePartitionSpec(table, partitionFilter); + ensureNameMappingPresent(table); + + if (isFileIdentifier(sourceIdent)) { + Path sourcePath = new Path(sourceIdent.name()); + String format = sourceIdent.namespace()[0]; + importFileTable(table, sourcePath, format, partitionFilter); + } else { + importCatalogTable(table, sourceIdent, partitionFilter); + } + + Snapshot snapshot = table.currentSnapshot(); + return Long.parseLong(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)); + }); + } + + private static void ensureNameMappingPresent(Table table) { + if (table.properties().get(TableProperties.DEFAULT_NAME_MAPPING) == null) { + // Forces Name based resolution instead of position based resolution + NameMapping mapping = MappingUtil.create(table.schema()); + String mappingJson = NameMappingParser.toJson(mapping); + table.updateProperties() + .set(TableProperties.DEFAULT_NAME_MAPPING, mappingJson) + .commit(); + } + } + + private void importFileTable(Table table, Path tableLocation, String format, Map<String, String> partitionFilter) { + // List Partitions via Spark InMemory file search interface + List<SparkPartition> partitions = Spark3Util.getPartitions(spark(), tableLocation, format); + List<SparkPartition> filteredPartitions = SparkTableUtil.filterPartitions(partitions, partitionFilter); Review comment: Do we need to validate the list is non-empty? ########## File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java ########## @@ -0,0 +1,437 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.extensions; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestAddFilesProcedure extends SparkExtensionsTestBase { + + private String sourceTableName = "source_table"; + private File fileTableDir; + + public TestAddFilesProcedure(String catalogName, String implementation, Map<String, String> config) { + super(catalogName, implementation, config); + } + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Before + public void setupTempDirs() { + try { + fileTableDir = temp.newFolder(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @After + public void dropTables() { + sql("DROP TABLE IF EXISTS %s", sourceTableName); + sql("DROP TABLE IF EXISTS %s", sourceTableName); + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void addDataUnpartitioned() { + createUnpartitionedFileTable("parquet"); + + String createIceberg = + "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg"; + + sql(createIceberg, tableName); + + Object importOperation = scalarSql("CALL %s.system.add_files('%s', '`parquet`.`%s`')", Review comment: It seems like `result` would be a more appropriate name for such vars. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
