RussellSpitzer commented on code in PR #4325:
URL: https://github.com/apache/iceberg/pull/4325#discussion_r922406943
##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java:
##########
@@ -375,12 +376,67 @@ private static Iterator<ManifestFile>
buildManifest(SerializableConfiguration co
* @param sourceTableIdent an identifier of the source Spark table
* @param targetTable an Iceberg table where to import the data
* @param stagingDir a staging directory to store temporary manifest files
- * @param partitionFilter only import partitions whose values match those in
the map, can be partially defined
+ */
+ public static ImportSparkTableBuilder importSparkTableBuilder(SparkSession
spark, TableIdentifier sourceTableIdent,
+ Table
targetTable, String stagingDir) {
+ return new ImportSparkTableBuilder(spark, sourceTableIdent, targetTable,
stagingDir);
+ }
+
+ public static class ImportSparkTableBuilder {
+ private final SparkSession spark;
+ private final TableIdentifier sourceTableIdent;
+ private final Table targetTable;
+ private final String stagingDir;
+ private Map<String, String> partitionFilter = Collections.emptyMap();
+ private boolean checkDuplicateFiles = false;
+ private boolean skipOnError = false;
+
+ public ImportSparkTableBuilder(SparkSession spark, TableIdentifier
sourceTableIdent, Table targetTable,
+ String stagingDir) {
+ this.spark = spark;
+ this.sourceTableIdent = sourceTableIdent;
+ this.targetTable = targetTable;
+ this.stagingDir = stagingDir;
+ }
+
+ public ImportSparkTableBuilder partitionFilter(Map<String, String>
newPartitionFilter) {
+ this.partitionFilter = newPartitionFilter;
+ return this;
+ }
+
+ public ImportSparkTableBuilder checkDuplicateFiles(boolean
newCheckDuplicateFiles) {
+ this.checkDuplicateFiles = newCheckDuplicateFiles;
+ return this;
+ }
+
+ public ImportSparkTableBuilder skipOnError(boolean newSkipOnError) {
+ this.skipOnError = newSkipOnError;
+ return this;
+ }
+
+ public void execute() {
+ importSparkTable(spark, sourceTableIdent, targetTable, stagingDir,
partitionFilter, checkDuplicateFiles,
+ skipOnError);
+ }
+ }
+
+ /**
+ * Import files from an existing Spark table to an Iceberg table.
+ *
+ * The import uses the Spark session to get table metadata. It assumes no
+ * operation is going on the original and target table and thus is not
+ * thread-safe.
+ *
+ * @param spark a Spark session
+ * @param sourceTableIdent an identifier of the source Spark table
+ * @param targetTable an Iceberg table where to import the data
+ * @param stagingDir a staging directory to store temporary manifest files
* @param checkDuplicateFiles if true, throw exception if import results in
a duplicate data file
+ * @param skipOnError if true, skip files which cannot be imported into
Iceberg
*/
public static void importSparkTable(SparkSession spark, TableIdentifier
sourceTableIdent, Table targetTable,
String stagingDir, Map<String, String>
partitionFilter,
- boolean checkDuplicateFiles) {
+ boolean checkDuplicateFiles, boolean
skipOnError) {
Review Comment:
Oh nvm, i see it below
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]