This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dd269b  Spark: Convert SparkTableUtil from Scala to Java (#1126)
8dd269b is described below

commit 8dd269bd5f1cc0c5260c83c22a1e010a6f92e460
Author: Thiru Paramasivan <[email protected]>
AuthorDate: Mon Jun 29 17:59:41 2020 -0700

    Spark: Convert SparkTableUtil from Scala to Java (#1126)
---
 baseline.gradle                                    |   1 -
 build.gradle                                       |  20 +-
 project/scalastyle_config.xml                      | 150 -----
 .../apache/iceberg/spark/SparkExceptionUtil.java   |  62 ++
 .../org/apache/iceberg/spark/SparkTableUtil.java   | 632 +++++++++++++++++++++
 .../org/apache/iceberg/spark/SparkTableUtil.scala  | 457 ---------------
 .../iceberg/spark/source/TestSparkTableUtil.java   |   5 +-
 .../TestSparkTableUtilWithInMemoryCatalog.java     |   5 +-
 8 files changed, 699 insertions(+), 633 deletions(-)

diff --git a/baseline.gradle b/baseline.gradle
index e708d9c..3b48c59 100644
--- a/baseline.gradle
+++ b/baseline.gradle
@@ -36,7 +36,6 @@ subprojects {
     apply plugin: 'com.palantir.baseline-checkstyle'
     apply plugin: 'com.palantir.baseline-error-prone'
   }
-  apply plugin: 'com.palantir.baseline-scalastyle'
   apply plugin: 'com.palantir.baseline-class-uniqueness'
   apply plugin: 'com.palantir.baseline-reproducibility'
   apply plugin: 'com.palantir.baseline-exact-dependencies'
diff --git a/build.gradle b/build.gradle
index 406c863..52c06dc 100644
--- a/build.gradle
+++ b/build.gradle
@@ -79,8 +79,7 @@ subprojects {
 
     compileClasspath {
       // do not exclude Guava so the bundle project can reference classes.
-      // the Spark module is also excluded because this breaks the Scala 
compiler
-      if (project.name != 'iceberg-bundled-guava' && project.name != 
'iceberg-spark' && project.name != 'iceberg-spark2') {
+      if (project.name != 'iceberg-bundled-guava') {
         exclude group: 'com.google.guava', module: 'guava'
       }
     }
@@ -413,16 +412,7 @@ project(':iceberg-arrow') {
 }
 
 project(':iceberg-spark') {
-  apply plugin: 'scala'
-
   configurations.all {
-    // this is needed to avoid a problem with dependency locking. it was
-    // suggested as a work-around here:
-    //  https://github.com/gradle/gradle/issues/6750
-    if (name.startsWith("incrementalScalaAnalysis")) {
-      extendsFrom = []
-    }
-
     resolutionStrategy {
       // Spark 2.4.4 can only use the below datanucleus version, the versions 
introduced
       // by Hive 2.3.6 will meet lots of unexpected issues, so here force to 
use the versions
@@ -472,16 +462,8 @@ project(':iceberg-spark') {
 }
 
 project(':iceberg-spark2') {
-  apply plugin: 'scala'
 
   configurations.all {
-    // this is needed to avoid a problem with dependency locking. it was
-    // suggested as a work-around here:
-    //  https://github.com/gradle/gradle/issues/6750
-    if (name.startsWith("incrementalScalaAnalysis")) {
-      extendsFrom = []
-    }
-
     resolutionStrategy {
       // Spark 2.4.4 can only use the below datanucleus version, the versions 
introduced
       // by Hive 2.3.6 will meet lots of unexpected issues, so here force to 
use the versions
diff --git a/project/scalastyle_config.xml b/project/scalastyle_config.xml
deleted file mode 100644
index 04333bb..0000000
--- a/project/scalastyle_config.xml
+++ /dev/null
@@ -1,150 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<scalastyle commentFilter="enabled">
-    <name>Iceberg Scalastyle configuration</name>
-    <check level="error" class="org.scalastyle.file.FileTabChecker" 
enabled="true"/>
-    <check level="error" class="org.scalastyle.file.FileLengthChecker" 
enabled="true">
-        <parameters>
-            <parameter name="maxFileLength"><![CDATA[800]]></parameter>
-        </parameters>
-    </check>
-    <check level="error" class="org.scalastyle.file.HeaderMatchesChecker" 
enabled="true">
-        <parameters>
-            <parameter name="regex">true</parameter>
-            <parameter name="header">(?m)^/\*$\n^ \* Licensed to the Apache 
Software Foundation \(ASF\) under one$</parameter>
-        </parameters>
-    </check>
-    <check level="error" 
class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"/>
-    <check level="error" 
class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="true"/>
-    <check level="error" 
class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"/>
-    <check level="error" class="org.scalastyle.file.FileLineLengthChecker" 
enabled="true">
-        <parameters>
-            <parameter name="maxLineLength"><![CDATA[120]]></parameter>
-            <parameter name="tabSize"><![CDATA[4]]></parameter>
-        </parameters>
-    </check>
-    <check level="error" class="org.scalastyle.scalariform.ClassNamesChecker" 
enabled="true">
-        <parameters>
-            <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
-        </parameters>
-    </check>
-    <check level="error" class="org.scalastyle.scalariform.ObjectNamesChecker" 
enabled="true">
-        <parameters>
-            <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
-        </parameters>
-    </check>
-    <check level="error" 
class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">
-        <parameters>
-            <parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter>
-        </parameters>
-    </check>
-    <check level="error" 
class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"/>
-    <check level="error" 
class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true">
-        <parameters>
-            <parameter 
name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter>
-        </parameters>
-    </check>
-    <check level="error" 
class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
-        <parameters>
-            <parameter name="maxParameters"><![CDATA[8]]></parameter>
-        </parameters>
-    </check>
-    <check level="error" class="org.scalastyle.scalariform.MagicNumberChecker" 
enabled="true">
-        <parameters>
-            <parameter name="ignore"><![CDATA[-1,0,1,2,3]]></parameter>
-        </parameters>
-    </check>
-    <check level="error" 
class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" 
enabled="true"/>
-    <check level="error" 
class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" 
enabled="true"/>
-    <check level="error" class="org.scalastyle.scalariform.ReturnChecker" 
enabled="false"/>
-    <check level="error" class="org.scalastyle.scalariform.NullChecker" 
enabled="false"/>
-    <check level="error" class="org.scalastyle.scalariform.NoCloneChecker" 
enabled="true"/>
-    <check level="error" class="org.scalastyle.scalariform.NoFinalizeChecker" 
enabled="true"/>
-    <check level="error" 
class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"/>
-    <check level="error" 
class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"/>
-    <check level="error" class="org.scalastyle.file.RegexChecker" 
enabled="true">
-        <parameters>
-            <parameter name="regex"><![CDATA[println]]></parameter>
-        </parameters>
-    </check>
-    <check level="error" 
class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="true">
-        <parameters>
-            <parameter name="maxTypes"><![CDATA[30]]></parameter>
-        </parameters>
-    </check>
-    <check level="error" 
class="org.scalastyle.scalariform.CyclomaticComplexityChecker" enabled="true">
-        <parameters>
-            <parameter name="maximum"><![CDATA[10]]></parameter>
-        </parameters>
-    </check>
-    <check level="error" class="org.scalastyle.scalariform.UppercaseLChecker" 
enabled="true"/>
-    <check level="error" 
class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker" 
enabled="true"/>
-    <check level="error" class="org.scalastyle.scalariform.IfBraceChecker" 
enabled="true">
-        <parameters>
-            <parameter name="singleLineAllowed"><![CDATA[true]]></parameter>
-            <parameter name="doubleLineAllowed"><![CDATA[false]]></parameter>
-        </parameters>
-    </check>
-    <check level="error" 
class="org.scalastyle.scalariform.MethodLengthChecker" enabled="true">
-        <parameters>
-            <parameter name="maxLength"><![CDATA[50]]></parameter>
-        </parameters>
-    </check>
-    <check level="error" class="org.scalastyle.scalariform.MethodNamesChecker" 
enabled="true">
-        <parameters>
-            <parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter>
-        </parameters>
-    </check>
-    <check level="error" 
class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="false">
-        <parameters>
-            <parameter name="maxMethods"><![CDATA[30]]></parameter>
-        </parameters>
-    </check>
-    <check level="error" 
class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"/>
-    <check level="error" class="org.scalastyle.file.NewLineAtEofChecker" 
enabled="true"/>
-    <check level="error" class="org.scalastyle.file.NoNewLineAtEofChecker" 
enabled="false"/>
-    <check level="error" class="org.scalastyle.scalariform.WhileChecker" 
enabled="false"/>
-    <check level="error" class="org.scalastyle.scalariform.VarFieldChecker" 
enabled="false"/>
-    <check level="error" class="org.scalastyle.scalariform.VarLocalChecker" 
enabled="false"/>
-    <check level="error" class="org.scalastyle.scalariform.RedundantIfChecker" 
enabled="false"/>
-    <check level="error" class="org.scalastyle.scalariform.TokenChecker" 
enabled="false">
-        <parameters>
-            <parameter name="regex"><![CDATA[println]]></parameter>
-        </parameters>
-    </check>
-    <check level="error" 
class="org.scalastyle.scalariform.DeprecatedJavaChecker" enabled="true"/>
-    <check level="error" class="org.scalastyle.scalariform.EmptyClassChecker" 
enabled="true"/>
-    <check level="error" 
class="org.scalastyle.scalariform.ClassTypeParameterChecker" enabled="true">
-        <parameters>
-            <parameter name="regex"><![CDATA[^[A-Z_]$]]></parameter>
-        </parameters>
-    </check>
-    <check level="error" 
class="org.scalastyle.scalariform.UnderscoreImportChecker" enabled="false"/>
-    <check level="error" 
class="org.scalastyle.scalariform.LowercasePatternMatchChecker" enabled="true"/>
-    <check level="error" class="org.scalastyle.scalariform.ImportOrderChecker" 
enabled="true">
-        <parameters>
-            <parameter name="groups">all</parameter>
-            <parameter name="group.all">.+</parameter>
-        </parameters>
-    </check>
-    <check level="error" 
class="org.scalastyle.scalariform.DisallowSpaceBeforeTokenChecker" 
enabled="true">
-        <parameters>
-            <parameter name="tokens">COMMA</parameter>
-        </parameters>
-    </check>
-</scalastyle>
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/SparkExceptionUtil.java 
b/spark/src/main/java/org/apache/iceberg/spark/SparkExceptionUtil.java
new file mode 100644
index 0000000..88b621b
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkExceptionUtil.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.io.IOException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.spark.sql.AnalysisException;
+
+public class SparkExceptionUtil {
+
+  private SparkExceptionUtil() {
+  }
+
+  /**
+   * Converts checked exceptions to unchecked exceptions.
+   *
+   * @param cause a checked exception object which is to be converted to its 
unchecked equivalent.
+   * @param message exception message as a format string
+   * @param args format specifiers
+   * @return unchecked exception.
+   */
+  public static RuntimeException toUncheckedException(Throwable cause, String 
message, Object... args) {
+    if (cause instanceof RuntimeException) {
+      return (RuntimeException) cause;
+
+    } else if (cause instanceof 
org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException) {
+      return new NoSuchNamespaceException(cause, message, args);
+
+    } else if (cause instanceof 
org.apache.spark.sql.catalyst.analysis.NoSuchTableException) {
+      return new NoSuchTableException(cause, message, args);
+
+    } else if (cause instanceof AnalysisException) {
+      return new ValidationException(cause, message, args);
+
+    } else if (cause instanceof IOException) {
+      return new RuntimeIOException((IOException) cause, message, args);
+
+    } else {
+      return new RuntimeException(String.format(message, args), cause);
+    }
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java 
b/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
new file mode 100644
index 0000000..38b6fa8
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.hadoop.SerializableConfiguration;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.orc.OrcMetrics;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition;
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.expressions.NamedExpression;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import scala.Function2;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.runtime.AbstractPartialFunction;
+
+import static org.apache.spark.sql.functions.col;
+
+/**
+ * Java version of the original SparkTableUtil.scala
+ * 
https://github.com/apache/iceberg/blob/apache-iceberg-0.8.0-incubating/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
+ */
+public class SparkTableUtil {
+
+  private static final PathFilter HIDDEN_PATH_FILTER =
+      p -> !p.getName().startsWith("_") && !p.getName().startsWith(".");
+
+  private SparkTableUtil() {
+  }
+
+  /**
+   * Returns a DataFrame with a row for each partition in the table.
+   *
+   * The DataFrame has 3 columns, partition key (a=1/b=2), partition location, 
and format
+   * (avro or parquet).
+   *
+   * @param spark a Spark session
+   * @param table a table name and (optional) database
+   * @return a DataFrame of the table's partitions
+   */
+  public static Dataset<Row> partitionDF(SparkSession spark, String table) {
+    List<SparkPartition> partitions = getPartitions(spark, table);
+    return spark.createDataFrame(partitions, 
SparkPartition.class).toDF("partition", "uri", "format");
+  }
+
+  /**
+   * Returns a DataFrame with a row for each partition that matches the 
specified 'expression'.
+   *
+   * @param spark a Spark session.
+   * @param table name of the table.
+   * @param expression The expression whose matching partitions are returned.
+   * @return a DataFrame of the table partitions.
+   */
+  public static Dataset<Row> partitionDFByFilter(SparkSession spark, String 
table, String expression) {
+    List<SparkPartition> partitions = getPartitionsByFilter(spark, table, 
expression);
+    return spark.createDataFrame(partitions, 
SparkPartition.class).toDF("partition", "uri", "format");
+  }
+
+  /**
+   * Returns all partitions in the table.
+   *
+   * @param spark a Spark session
+   * @param table a table name and (optional) database
+   * @return all table's partitions
+   */
+  public static List<SparkPartition> getPartitions(SparkSession spark, String 
table) {
+    try {
+      TableIdentifier tableIdent = 
spark.sessionState().sqlParser().parseTableIdentifier(table);
+      return getPartitions(spark, tableIdent);
+    } catch (ParseException e) {
+      throw SparkExceptionUtil.toUncheckedException(e, "Unable to parse table 
identifier: %s", table);
+    }
+  }
+
+  /**
+   * Returns all partitions in the table.
+   *
+   * @param spark a Spark session
+   * @param tableIdent a table identifier
+   * @return all table's partitions
+   */
+  public static List<SparkPartition> getPartitions(SparkSession spark, 
TableIdentifier tableIdent) {
+    try {
+      SessionCatalog catalog = spark.sessionState().catalog();
+      CatalogTable catalogTable = catalog.getTableMetadata(tableIdent);
+
+      Seq<CatalogTablePartition> partitions = 
catalog.listPartitions(tableIdent, Option.empty());
+
+      return JavaConverters
+          .seqAsJavaListConverter(partitions)
+          .asJava()
+          .stream()
+          .map(catalogPartition -> toSparkPartition(catalogPartition, 
catalogTable))
+          .collect(Collectors.toList());
+    } catch (NoSuchDatabaseException e) {
+      throw SparkExceptionUtil.toUncheckedException(e, "Unknown table: %s. 
Database not found in catalog.", tableIdent);
+    } catch (NoSuchTableException e) {
+      throw SparkExceptionUtil.toUncheckedException(e, "Unknown table: %s. 
Table not found in catalog.", tableIdent);
+    }
+  }
+
+  /**
+   * Returns partitions that match the specified 'predicate'.
+   *
+   * @param spark a Spark session
+   * @param table a table name and (optional) database
+   * @param predicate a predicate on partition columns
+   * @return matching table's partitions
+   */
+  public static List<SparkPartition> getPartitionsByFilter(SparkSession spark, 
String table, String predicate) {
+    TableIdentifier tableIdent;
+    try {
+      tableIdent = 
spark.sessionState().sqlParser().parseTableIdentifier(table);
+    } catch (ParseException e) {
+      throw SparkExceptionUtil.toUncheckedException(e, "Unable to parse the 
table identifier: %s", table);
+    }
+
+    Expression unresolvedPredicateExpr;
+    try {
+      unresolvedPredicateExpr = 
spark.sessionState().sqlParser().parseExpression(predicate);
+    } catch (ParseException e) {
+      throw SparkExceptionUtil.toUncheckedException(e, "Unable to parse the 
predicate expression: %s", predicate);
+    }
+
+    Expression resolvedPredicateExpr = resolveAttrs(spark, table, 
unresolvedPredicateExpr);
+    return getPartitionsByFilter(spark, tableIdent, resolvedPredicateExpr);
+  }
+
+  /**
+   * Returns partitions that match the specified 'predicate'.
+   *
+   * @param spark a Spark session
+   * @param tableIdent a table identifier
+   * @param predicateExpr a predicate expression on partition columns
+   * @return matching table's partitions
+   */
+  public static List<SparkPartition> getPartitionsByFilter(SparkSession spark, 
TableIdentifier tableIdent,
+                                                           Expression 
predicateExpr) {
+    try {
+      SessionCatalog catalog = spark.sessionState().catalog();
+      CatalogTable catalogTable = catalog.getTableMetadata(tableIdent);
+
+      Expression resolvedPredicateExpr;
+      if (!predicateExpr.resolved()) {
+        resolvedPredicateExpr = resolveAttrs(spark, tableIdent.quotedString(), 
predicateExpr);
+      } else {
+        resolvedPredicateExpr = predicateExpr;
+      }
+      Seq<Expression> predicates = JavaConverters
+          
.collectionAsScalaIterableConverter(ImmutableList.of(resolvedPredicateExpr))
+          .asScala().toSeq();
+
+      Seq<CatalogTablePartition> partitions = 
catalog.listPartitionsByFilter(tableIdent, predicates);
+
+      return JavaConverters
+          .seqAsJavaListConverter(partitions)
+          .asJava()
+          .stream()
+          .map(catalogPartition -> toSparkPartition(catalogPartition, 
catalogTable))
+          .collect(Collectors.toList());
+    } catch (NoSuchDatabaseException e) {
+      throw SparkExceptionUtil.toUncheckedException(e, "Unknown table: %s. 
Database not found in catalog.", tableIdent);
+    } catch (NoSuchTableException e) {
+      throw SparkExceptionUtil.toUncheckedException(e, "Unknown table: %s. 
Table not found in catalog.", tableIdent);
+    }
+  }
+
+  /**
+   * Returns the data files in a partition by listing the partition location.
+   *
+   * For Parquet and ORC partitions, this will read metrics from the file 
footer. For Avro partitions,
+   * metrics are set to null.
+   *
+   * @param partition a partition
+   * @param conf a serializable Hadoop conf
+   * @param metricsConfig a metrics conf
+   * @return a List of DataFile
+   */
+  public static List<DataFile> listPartition(SparkPartition partition, 
PartitionSpec spec,
+                                             SerializableConfiguration conf, 
MetricsConfig metricsConfig) {
+    return listPartition(partition.values, partition.uri, partition.format, 
spec, conf.get(), metricsConfig);
+  }
+
+  /**
+   * Returns the data files in a partition by listing the partition location.
+   *
+   * For Parquet and ORC partitions, this will read metrics from the file 
footer. For Avro partitions,
+   * metrics are set to null.
+   *
+   * @param partition partition key, e.g., "a=1/b=2"
+   * @param uri partition location URI
+   * @param format partition format, avro or parquet
+   * @param conf a Hadoop conf
+   * @param metricsConfig a metrics conf
+   * @return a List of DataFile
+   */
+  public static List<DataFile> listPartition(Map<String, String> partition, 
String uri, String format,
+                                             PartitionSpec spec, Configuration 
conf, MetricsConfig metricsConfig) {
+    if (format.contains("avro")) {
+      return listAvroPartition(partition, uri, spec, conf);
+    } else if (format.contains("parquet")) {
+      return listParquetPartition(partition, uri, spec, conf, metricsConfig);
+    } else if (format.contains("orc")) {
+      // TODO: use MetricsConfig in listOrcPartition
+      return listOrcPartition(partition, uri, spec, conf);
+    } else {
+      throw new UnsupportedOperationException("Unknown partition format: " + 
format);
+    }
+  }
+
+  private static List<DataFile> listAvroPartition(
+      Map<String, String> partitionPath, String partitionUri, PartitionSpec 
spec, Configuration conf) {
+    try {
+      Path partition = new Path(partitionUri);
+      FileSystem fs = partition.getFileSystem(conf);
+      return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
+          .filter(FileStatus::isFile)
+          .map(stat -> {
+            Metrics metrics = new Metrics(-1L, null, null, null);
+            String partitionKey = spec.fields().stream()
+                .map(PartitionField::name)
+                .map(name -> String.format("%s=%s", name, 
partitionPath.get(name)))
+                .collect(Collectors.joining("/"));
+
+            return DataFiles.builder(spec)
+                .withPath(stat.getPath().toString())
+                .withFormat("avro")
+                .withFileSizeInBytes(stat.getLen())
+                .withMetrics(metrics)
+                .withPartitionPath(partitionKey)
+                .build();
+
+          }).collect(Collectors.toList());
+    } catch (IOException e) {
+      throw SparkExceptionUtil.toUncheckedException(e, "Unable to list files 
in partition: %s", partitionUri);
+    }
+  }
+
+  private static List<DataFile> listParquetPartition(Map<String, String> 
partitionPath, String partitionUri,
+                                                     PartitionSpec spec, 
Configuration conf,
+                                                     MetricsConfig 
metricsSpec) {
+    try {
+      Path partition = new Path(partitionUri);
+      FileSystem fs = partition.getFileSystem(conf);
+
+      return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
+          .filter(FileStatus::isFile)
+          .map(stat -> {
+            Metrics metrics;
+            try {
+              metrics = 
ParquetUtil.footerMetrics(ParquetFileReader.readFooter(conf, stat), 
metricsSpec);
+            } catch (IOException e) {
+              throw SparkExceptionUtil.toUncheckedException(
+                  e, "Unable to read the footer of the parquet file: %s", 
stat.getPath());
+            }
+            String partitionKey = spec.fields().stream()
+                .map(PartitionField::name)
+                .map(name -> String.format("%s=%s", name, 
partitionPath.get(name)))
+                .collect(Collectors.joining("/"));
+
+            return DataFiles.builder(spec)
+                .withPath(stat.getPath().toString())
+                .withFormat("parquet")
+                .withFileSizeInBytes(stat.getLen())
+                .withMetrics(metrics)
+                .withPartitionPath(partitionKey)
+                .build();
+
+          }).collect(Collectors.toList());
+    } catch (IOException e) {
+      throw SparkExceptionUtil.toUncheckedException(e, "Unable to list files 
in partition: %s", partitionUri);
+    }
+  }
+
+  private static List<DataFile> listOrcPartition(
+      Map<String, String> partitionPath, String partitionUri, PartitionSpec 
spec, Configuration conf) {
+    try {
+      Path partition = new Path(partitionUri);
+      FileSystem fs = partition.getFileSystem(conf);
+
+      return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
+          .filter(FileStatus::isFile)
+          .map(stat -> {
+            Metrics metrics = 
OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath(), conf));
+            String partitionKey = spec.fields().stream()
+                .map(PartitionField::name)
+                .map(name -> String.format("%s=%s", name, 
partitionPath.get(name)))
+                .collect(Collectors.joining("/"));
+
+            return DataFiles.builder(spec)
+                .withPath(stat.getPath().toString())
+                .withFormat("orc")
+                .withFileSizeInBytes(stat.getLen())
+                .withMetrics(metrics)
+                .withPartitionPath(partitionKey)
+                .build();
+
+          }).collect(Collectors.toList());
+    } catch (IOException e) {
+      throw SparkExceptionUtil.toUncheckedException(e, "Unable to list files 
in partition: %s", partitionUri);
+    }
+  }
+
+  private static SparkPartition toSparkPartition(CatalogTablePartition 
partition, CatalogTable table) {
+    Option<URI> locationUri = partition.storage().locationUri();
+    Option<String> serde = partition.storage().serde();
+
+    Preconditions.checkArgument(locationUri.nonEmpty(), "Partition URI should 
be defined");
+    Preconditions.checkArgument(serde.nonEmpty() || 
table.provider().nonEmpty(),
+        "Partition format should be defined");
+
+    String uri = String.valueOf(locationUri.get());
+    String format = serde.nonEmpty() ? serde.get() : table.provider().get();
+
+    Map<String, String> partitionSpec = 
JavaConverters.mapAsJavaMapConverter(partition.spec()).asJava();
+    return new SparkPartition(partitionSpec, uri, format);
+  }
+
+  private static Expression resolveAttrs(SparkSession spark, String table, 
Expression expr) {
+    Function2<String, String, Object> resolver = 
spark.sessionState().analyzer().resolver();
+    LogicalPlan plan = spark.table(table).queryExecution().analyzed();
+    return expr.transform(new AbstractPartialFunction<Expression, 
Expression>() {
+      @Override
+      public Expression apply(Expression attr) {
+        UnresolvedAttribute unresolvedAttribute = (UnresolvedAttribute) attr;
+        Option<NamedExpression> namedExpressionOption = 
plan.resolve(unresolvedAttribute.nameParts(), resolver);
+        if (namedExpressionOption.isDefined()) {
+          return (Expression) namedExpressionOption.get();
+        } else {
+          throw new IllegalArgumentException(
+              String.format("Could not resolve %s using columns: %s", attr, 
plan.output()));
+        }
+      }
+
+      @Override
+      public boolean isDefinedAt(Expression attr) {
+        return attr instanceof UnresolvedAttribute;
+      }
+    });
+  }
+
+  private static Iterator<ManifestFile> 
buildManifest(SerializableConfiguration conf, PartitionSpec spec,
+                                                      String basePath, 
Iterator<Tuple2<String, DataFile>> fileTuples) {
+    if (fileTuples.hasNext()) {
+      FileIO io = new HadoopFileIO(conf.get());
+      TaskContext ctx = TaskContext.get();
+      String suffix = String.format("stage-%d-task-%d-manifest", 
ctx.stageId(), ctx.taskAttemptId());
+      Path location = new Path(basePath, suffix);
+      String outputPath = FileFormat.AVRO.addExtension(location.toString());
+      OutputFile outputFile = io.newOutputFile(outputPath);
+      ManifestWriter<DataFile> writer = ManifestFiles.write(spec, outputFile);
+
+      try (ManifestWriter<DataFile> writerRef = writer) {
+        fileTuples.forEachRemaining(fileTuple -> writerRef.add(fileTuple._2));
+      } catch (IOException e) {
+        throw SparkExceptionUtil.toUncheckedException(e, "Unable to close the 
manifest writer: %s", outputPath);
+      }
+
+      ManifestFile manifestFile = writer.toManifestFile();
+      return ImmutableList.of(manifestFile).iterator();
+    } else {
+      return Collections.emptyIterator();
+    }
+  }
+
+  /**
+   * Import files from an existing Spark table to an Iceberg table.
+   *
+   * The import uses the Spark session to get table metadata. It assumes no
+   * operation is going on the original and target table and thus is not
+   * thread-safe.
+   *
+   * @param spark a Spark session
+   * @param sourceTableIdent an identifier of the source Spark table
+   * @param targetTable an Iceberg table where to import the data
+   * @param stagingDir a staging directory to store temporary manifest files
+   */
+  public static void importSparkTable(
+      SparkSession spark, TableIdentifier sourceTableIdent, Table targetTable, 
String stagingDir) {
+    SessionCatalog catalog = spark.sessionState().catalog();
+
+    String db = sourceTableIdent.database().nonEmpty() ?
+        sourceTableIdent.database().get() :
+        catalog.getCurrentDatabase();
+    TableIdentifier sourceTableIdentWithDB = new 
TableIdentifier(sourceTableIdent.table(), Some.apply(db));
+
+    if (!catalog.tableExists(sourceTableIdentWithDB)) {
+      throw new org.apache.iceberg.exceptions.NoSuchTableException(
+          String.format("Table %s does not exist", sourceTableIdentWithDB));
+    }
+
+    try {
+      PartitionSpec spec = SparkSchemaUtil.specForTable(spark, 
sourceTableIdentWithDB.unquotedString());
+
+      if (spec == PartitionSpec.unpartitioned()) {
+        importUnpartitionedSparkTable(spark, sourceTableIdentWithDB, 
targetTable);
+      } else {
+        List<SparkPartition> sourceTablePartitions = getPartitions(spark, 
sourceTableIdent);
+        importSparkPartitions(spark, sourceTablePartitions, targetTable, spec, 
stagingDir);
+      }
+    } catch (AnalysisException e) {
+      throw SparkExceptionUtil.toUncheckedException(
+          e, "Unable to get partition spec for table: %s", 
sourceTableIdentWithDB);
+    }
+  }
+
+  private static void importUnpartitionedSparkTable(
+      SparkSession spark, TableIdentifier sourceTableIdent, Table targetTable) 
{
+    try {
+      CatalogTable sourceTable = 
spark.sessionState().catalog().getTableMetadata(sourceTableIdent);
+      Option<String> format =
+          sourceTable.storage().serde().nonEmpty() ? 
sourceTable.storage().serde() : sourceTable.provider();
+      Preconditions.checkArgument(format.nonEmpty(), "Could not determine 
table format");
+
+      Map<String, String> partition = Collections.emptyMap();
+      PartitionSpec spec = PartitionSpec.unpartitioned();
+      Configuration conf = spark.sessionState().newHadoopConf();
+      MetricsConfig metricsConfig = 
MetricsConfig.fromProperties(targetTable.properties());
+
+      List<DataFile> files = listPartition(
+          partition, sourceTable.location().toString(), format.get(), spec, 
conf, metricsConfig);
+
+      AppendFiles append = targetTable.newAppend();
+      files.forEach(append::appendFile);
+      append.commit();
+    } catch (NoSuchDatabaseException e) {
+      throw SparkExceptionUtil.toUncheckedException(
+          e, "Unknown table: %s. Database not found in catalog.", 
sourceTableIdent);
+    } catch (NoSuchTableException e) {
+      throw SparkExceptionUtil.toUncheckedException(
+          e, "Unknown table: %s. Table not found in catalog.", 
sourceTableIdent);
+    }
+  }
+
+  /**
+   * Import files from given partitions to an Iceberg table.
+   *
+   * @param spark a Spark session
+   * @param partitions partitions to import
+   * @param targetTable an Iceberg table where to import the data
+   * @param spec a partition spec
+   * @param stagingDir a staging directory to store temporary manifest files
+   */
+  public static void importSparkPartitions(
+      SparkSession spark, List<SparkPartition> partitions, Table targetTable, 
PartitionSpec spec, String stagingDir) {
+    Configuration conf = spark.sessionState().newHadoopConf();
+    SerializableConfiguration serializableConf = new 
SerializableConfiguration(conf);
+    int parallelism = Math.min(partitions.size(), 
spark.sessionState().conf().parallelPartitionDiscoveryParallelism());
+    int numShufflePartitions = 
spark.sessionState().conf().numShufflePartitions();
+    MetricsConfig metricsConfig = 
MetricsConfig.fromProperties(targetTable.properties());
+
+    JavaSparkContext sparkContext = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
+    JavaRDD<SparkPartition> partitionRDD = 
sparkContext.parallelize(partitions, parallelism);
+
+    Dataset<SparkPartition> partitionDS = spark.createDataset(
+        partitionRDD.rdd(),
+        Encoders.javaSerialization(SparkPartition.class));
+
+    List<ManifestFile> manifests = partitionDS
+        .flatMap((FlatMapFunction<SparkPartition, DataFile>) sparkPartition ->
+                listPartition(sparkPartition, spec, serializableConf, 
metricsConfig).iterator(),
+            Encoders.javaSerialization(DataFile.class))
+        .repartition(numShufflePartitions)
+        .map((MapFunction<DataFile, Tuple2<String, DataFile>>) file ->
+                Tuple2.apply(file.path().toString(), file),
+            Encoders.tuple(Encoders.STRING(), 
Encoders.javaSerialization(DataFile.class)))
+        .orderBy(col("_1"))
+        .mapPartitions(
+            (MapPartitionsFunction<Tuple2<String, DataFile>, ManifestFile>) 
fileTuple ->
+                buildManifest(serializableConf, spec, stagingDir, fileTuple),
+            Encoders.javaSerialization(ManifestFile.class))
+        .collectAsList();
+
+    try {
+      boolean snapshotIdInheritanceEnabled = PropertyUtil.propertyAsBoolean(
+          targetTable.properties(),
+          TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
+          TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT);
+
+      AppendFiles append = targetTable.newAppend();
+      manifests.forEach(append::appendManifest);
+      append.commit();
+
+      if (!snapshotIdInheritanceEnabled) {
+        // delete original manifests as they were rewritten before the commit
+        deleteManifests(targetTable.io(), manifests);
+      }
+    } catch (Throwable e) {
+      deleteManifests(targetTable.io(), manifests);
+      throw e;
+    }
+  }
+
+  private static void deleteManifests(FileIO io, List<ManifestFile> manifests) 
{
+    Tasks.foreach(manifests)
+        .noRetry()
+        .suppressFailureWhenFinished()
+        .run(item -> io.deleteFile(item.path()));
+  }
+
+  /**
+   * Class representing a table partition.
+   */
+  public static class SparkPartition implements Serializable {
+    private final Map<String, String> values;
+    private final String uri;
+    private final String format;
+
+    public SparkPartition(Map<String, String> values, String uri, String 
format) {
+      this.values = ImmutableMap.copyOf(values);
+      this.uri = uri;
+      this.format = format;
+    }
+
+    public Map<String, String> getValues() {
+      return values;
+    }
+
+    public String getUri() {
+      return uri;
+    }
+
+    public String getFormat() {
+      return format;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("values", values)
+          .add("uri", uri)
+          .add("format", format)
+          .toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      SparkPartition that = (SparkPartition) o;
+      return Objects.equal(values, that.values) &&
+          Objects.equal(uri, that.uri) &&
+          Objects.equal(format, that.format);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(values, uri, format);
+    }
+  }
+}
diff --git 
a/spark2/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala 
b/spark2/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
deleted file mode 100644
index 55ceda2..0000000
--- a/spark2/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
+++ /dev/null
@@ -1,457 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.spark
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{Path, PathFilter}
-import org.apache.iceberg.{DataFile, DataFiles, FileFormat, ManifestFile, 
ManifestFiles}
-import org.apache.iceberg.{Metrics, MetricsConfig, PartitionSpec, Table, 
TableProperties}
-import org.apache.iceberg.exceptions.NoSuchTableException
-import org.apache.iceberg.hadoop.{HadoopFileIO, HadoopInputFile, 
SerializableConfiguration}
-import org.apache.iceberg.orc.OrcMetrics
-import org.apache.iceberg.parquet.ParquetUtil
-import org.apache.iceberg.relocated.com.google.common.collect.Maps
-import org.apache.iceberg.util.PropertyUtil
-import org.apache.parquet.hadoop.ParquetFileReader
-import org.apache.spark.TaskContext
-import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTablePartition}
-import org.apache.spark.sql.catalyst.expressions.Expression
-import scala.collection.JavaConverters._
-import scala.util.Try
-
-object SparkTableUtil {
-  /**
-   * Returns a DataFrame with a row for each partition in the table.
-   *
-   * The DataFrame has 3 columns, partition key (a=1/b=2), partition location, 
and format
-   * (avro or parquet).
-   *
-   * @param spark a Spark session
-   * @param table a table name and (optional) database
-   * @return a DataFrame of the table's partitions
-   */
-  def partitionDF(spark: SparkSession, table: String): DataFrame = {
-    import spark.implicits._
-
-    val partitions = getPartitions(spark, table)
-    partitions.toDF("partition", "uri", "format")
-  }
-
-  /**
-    * Returns a DataFrame with a row for each partition that matches the 
specified 'expression'.
-    *
-    * @param spark a Spark session.
-    * @param table name of the table.
-    * @param expression The expression whose matching partitions are returned.
-    * @return a DataFrame of the table partitions.
-    */
-  def partitionDFByFilter(spark: SparkSession, table: String, expression: 
String): DataFrame = {
-    import spark.implicits._
-
-    val partitions = getPartitionsByFilter(spark, table, expression)
-    partitions.toDF("partition", "uri", "format")
-  }
-
-  /**
-   * Returns all partitions in the table.
-   *
-   * @param spark a Spark session
-   * @param table a table name and (optional) database
-   * @return all table's partitions
-   */
-  def getPartitions(spark: SparkSession, table: String): Seq[SparkPartition] = 
{
-    val tableIdent = spark.sessionState.sqlParser.parseTableIdentifier(table)
-    getPartitions(spark, tableIdent)
-  }
-
-  /**
-   * Returns all partitions in the table.
-   *
-   * @param spark a Spark session
-   * @param tableIdent a table identifier
-   * @return all table's partitions
-   */
-  def getPartitions(spark: SparkSession, tableIdent: TableIdentifier): 
Seq[SparkPartition] = {
-    val catalog = spark.sessionState.catalog
-    val catalogTable = catalog.getTableMetadata(tableIdent)
-
-    catalog
-      .listPartitions(tableIdent)
-      .map(catalogPartition => toSparkPartition(catalogPartition, 
catalogTable))
-  }
-
-  /**
-   * Returns partitions that match the specified 'predicate'.
-   *
-   * @param spark a Spark session
-   * @param table a table name and (optional) database
-   * @param predicate a predicate on partition columns
-   * @return matching table's partitions
-   */
-  def getPartitionsByFilter(spark: SparkSession, table: String, predicate: 
String): Seq[SparkPartition] = {
-    val tableIdent = spark.sessionState.sqlParser.parseTableIdentifier(table)
-    val unresolvedPredicateExpr = 
spark.sessionState.sqlParser.parseExpression(predicate)
-    val resolvedPredicateExpr = resolveAttrs(spark, table, 
unresolvedPredicateExpr)
-    getPartitionsByFilter(spark, tableIdent, resolvedPredicateExpr)
-  }
-
-  /**
-   * Returns partitions that match the specified 'predicate'.
-   *
-   * @param spark a Spark session
-   * @param tableIdent a table identifier
-   * @param predicateExpr a predicate expression on partition columns
-   * @return matching table's partitions
-   */
-  def getPartitionsByFilter(
-      spark: SparkSession,
-      tableIdent: TableIdentifier,
-      predicateExpr: Expression): Seq[SparkPartition] = {
-
-    val catalog = spark.sessionState.catalog
-    val catalogTable = catalog.getTableMetadata(tableIdent)
-
-    val resolvedPredicateExpr = if (!predicateExpr.resolved) {
-      resolveAttrs(spark, tableIdent.quotedString, predicateExpr)
-    } else {
-      predicateExpr
-    }
-
-    catalog
-      .listPartitionsByFilter(tableIdent, Seq(resolvedPredicateExpr))
-      .map(catalogPartition => toSparkPartition(catalogPartition, 
catalogTable))
-  }
-
-  /**
-   * Returns the data files in a partition by listing the partition location.
-   *
-   * For Parquet and ORC partitions, this will read metrics from the file 
footer. For Avro partitions,
-   * metrics are set to null.
-   *
-   * @param partition a partition
-   * @param conf a serializable Hadoop conf
-   * @param metricsConfig a metrics conf
-   * @return a Seq of [[DataFile]]
-   */
-  def listPartition(
-      partition: SparkPartition,
-      spec: PartitionSpec,
-      conf: SerializableConfiguration,
-      metricsConfig: MetricsConfig): Seq[DataFile] = {
-
-    listPartition(partition.values, partition.uri, partition.format, spec, 
conf.get(), metricsConfig)
-  }
-
-  /**
-   * Returns the data files in a partition by listing the partition location.
-   *
-   * For Parquet and ORC partitions, this will read metrics from the file 
footer. For Avro partitions,
-   * metrics are set to null.
-   *
-   * @param partition partition key, e.g., "a=1/b=2"
-   * @param uri partition location URI
-   * @param format partition format, avro or parquet
-   * @param conf a Hadoop conf
-   * @param metricsConfig a metrics conf
-   * @return a seq of [[DataFile]]
-   */
-  def listPartition(
-      partition: Map[String, String],
-      uri: String,
-      format: String,
-      spec: PartitionSpec,
-      conf: Configuration = new Configuration(),
-      metricsConfig: MetricsConfig = MetricsConfig.getDefault): Seq[DataFile] 
= {
-
-    if (format.contains("avro")) {
-      listAvroPartition(partition, uri, spec, conf)
-    } else if (format.contains("parquet")) {
-      listParquetPartition(partition, uri, spec, conf, metricsConfig)
-    } else if (format.contains("orc")) {
-      // TODO: use MetricsConfig in listOrcPartition
-      listOrcPartition(partition, uri, spec, conf)
-    } else {
-      throw new UnsupportedOperationException(s"Unknown partition format: 
$format")
-    }
-  }
-
-  /**
-   * Case class representing a table partition.
-   */
-  case class SparkPartition(values: Map[String, String], uri: String, format: 
String)
-
-  private def arrayToMap(arr: Array[Long]): java.util.Map[Integer, 
java.lang.Long] = {
-    if (arr != null) {
-      val map: java.util.Map[Integer, java.lang.Long] = Maps.newHashMap()
-      arr.zipWithIndex.foreach {
-        case (-1, _) => // skip default values
-        case (value, index) => map.put(index, value)
-      }
-      map
-    } else {
-      null
-    }
-  }
-
-  private object HiddenPathFilter extends PathFilter {
-    override def accept(p: Path): Boolean = {
-      !p.getName.startsWith("_") && !p.getName.startsWith(".")
-    }
-  }
-
-  private def listAvroPartition(
-      partitionPath: Map[String, String],
-      partitionUri: String,
-      spec: PartitionSpec,
-      conf: Configuration): Seq[DataFile] = {
-    val partition = new Path(partitionUri)
-    val fs = partition.getFileSystem(conf)
-
-    fs.listStatus(partition, HiddenPathFilter).filter(_.isFile).map { stat =>
-      val metrics = new Metrics(-1L, arrayToMap(null), arrayToMap(null), 
arrayToMap(null))
-      val partitionKey = spec.fields.asScala.map(_.name).map { name =>
-        s"$name=${partitionPath(name)}"
-      }.mkString("/")
-
-      DataFiles.builder(spec)
-        .withPath(stat.getPath.toString)
-        .withFormat("avro")
-        .withFileSizeInBytes(stat.getLen)
-        .withMetrics(metrics)
-        .withPartitionPath(partitionKey)
-        .build()
-    }
-  }
-
-  //noinspection ScalaDeprecation
-  private def listParquetPartition(
-      partitionPath: Map[String, String],
-      partitionUri: String,
-      spec: PartitionSpec,
-      conf: Configuration,
-      metricsSpec: MetricsConfig): Seq[DataFile] = {
-    val partition = new Path(partitionUri)
-    val fs = partition.getFileSystem(conf)
-
-    fs.listStatus(partition, HiddenPathFilter).filter(_.isFile).map { stat =>
-      val metrics = 
ParquetUtil.footerMetrics(ParquetFileReader.readFooter(conf, stat), metricsSpec)
-      val partitionKey = spec.fields.asScala.map(_.name).map { name =>
-        s"$name=${partitionPath(name)}"
-      }.mkString("/")
-
-      DataFiles.builder(spec)
-        .withPath(stat.getPath.toString)
-        .withFormat("parquet")
-        .withFileSizeInBytes(stat.getLen)
-        .withMetrics(metrics)
-        .withPartitionPath(partitionKey)
-        .build()
-    }
-  }
-
-  private def listOrcPartition(
-      partitionPath: Map[String, String],
-      partitionUri: String,
-      spec: PartitionSpec,
-      conf: Configuration): Seq[DataFile] = {
-    val partition = new Path(partitionUri)
-    val fs = partition.getFileSystem(conf)
-
-    fs.listStatus(partition, HiddenPathFilter).filter(_.isFile).map { stat =>
-      val metrics = 
OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath, conf))
-      val partitionKey = spec.fields.asScala.map(_.name).map { name =>
-        s"$name=${partitionPath(name)}"
-      }.mkString("/")
-
-      DataFiles.builder(spec)
-        .withPath(stat.getPath.toString)
-        .withFormat("orc")
-        .withFileSizeInBytes(stat.getLen)
-        .withMetrics(metrics)
-        .withPartitionPath(partitionKey)
-        .build()
-    }
-  }
-
-  private def toSparkPartition(partition: CatalogTablePartition, table: 
CatalogTable): SparkPartition = {
-    val uri = partition.storage.locationUri.map(String.valueOf(_))
-    require(uri.nonEmpty, "Partition URI should be defined")
-
-    val format = partition.storage.serde.orElse(table.provider)
-    require(format.nonEmpty, "Partition format should be defined")
-
-    SparkPartition(partition.spec, uri.get, format.get)
-  }
-
-  private def resolveAttrs(spark: SparkSession, table: String, expr: 
Expression): Expression = {
-    val resolver = spark.sessionState.analyzer.resolver
-    val plan = spark.table(table).queryExecution.analyzed
-    expr.transform {
-      case attr: UnresolvedAttribute =>
-        plan.resolve(attr.nameParts, resolver) match {
-          case Some(resolvedAttr) => resolvedAttr
-          case None => throw new IllegalArgumentException(s"Could not resolve 
$attr using columns: ${plan.output}")
-        }
-    }
-  }
-
-  private def buildManifest(
-      conf: SerializableConfiguration,
-      spec: PartitionSpec,
-      basePath: String): Iterator[DataFile] => Iterator[ManifestFile] = { 
files =>
-    if (files.hasNext) {
-      val io = new HadoopFileIO(conf.get())
-      val ctx = TaskContext.get()
-      val location = new Path(basePath, 
s"stage-${ctx.stageId()}-task-${ctx.taskAttemptId()}-manifest")
-      val outputFile = 
io.newOutputFile(FileFormat.AVRO.addExtension(location.toString))
-      val writer = ManifestFiles.write(spec, outputFile)
-      try {
-        files.foreach(writer.add)
-      } finally {
-        writer.close()
-      }
-
-      val manifestFile = writer.toManifestFile
-      Seq(manifestFile).iterator
-    } else {
-      Seq.empty.iterator
-    }
-  }
-
-  /**
-   * Import files from an existing Spark table to an Iceberg table.
-   *
-   * The import uses the Spark session to get table metadata. It assumes no
-   * operation is going on the original and target table and thus is not
-   * thread-safe.
-   *
-   * @param spark a Spark session
-   * @param sourceTableIdent an identifier of the source Spark table
-   * @param targetTable an Iceberg table where to import the data
-   * @param stagingDir a staging directory to store temporary manifest files
-   */
-  def importSparkTable(
-      spark: SparkSession,
-      sourceTableIdent: TableIdentifier,
-      targetTable: Table,
-      stagingDir: String): Unit = {
-
-    val catalog = spark.sessionState.catalog
-
-    val db = sourceTableIdent.database.getOrElse(catalog.getCurrentDatabase)
-    val sourceTableIdentWithDB = sourceTableIdent.copy(database = Some(db))
-
-    if (!catalog.tableExists(sourceTableIdentWithDB)) {
-      throw new NoSuchTableException(s"Table $sourceTableIdentWithDB does not 
exist")
-    }
-
-    val spec = SparkSchemaUtil.specForTable(spark, 
sourceTableIdentWithDB.unquotedString)
-
-    if (spec == PartitionSpec.unpartitioned) {
-      importUnpartitionedSparkTable(spark, sourceTableIdentWithDB, targetTable)
-    } else {
-      val sourceTablePartitions = getPartitions(spark, sourceTableIdent)
-      importSparkPartitions(spark, sourceTablePartitions, targetTable, spec, 
stagingDir)
-    }
-  }
-
-  private def importUnpartitionedSparkTable(
-      spark: SparkSession,
-      sourceTableIdent: TableIdentifier,
-      targetTable: Table): Unit = {
-
-    val sourceTable = 
spark.sessionState.catalog.getTableMetadata(sourceTableIdent)
-    val format = sourceTable.storage.serde.orElse(sourceTable.provider)
-    require(format.nonEmpty, "Could not determine table format")
-
-    val partition = Map.empty[String, String]
-    val spec = PartitionSpec.unpartitioned()
-    val conf = spark.sessionState.newHadoopConf()
-    val metricsConfig = MetricsConfig.fromProperties(targetTable.properties)
-
-    val files = listPartition(partition, sourceTable.location.toString, 
format.get, spec, conf, metricsConfig)
-
-    val append = targetTable.newAppend()
-    files.foreach(append.appendFile)
-    append.commit()
-  }
-
-  /**
-   * Import files from given partitions to an Iceberg table.
-   *
-   * @param spark a Spark session
-   * @param partitions partitions to import
-   * @param targetTable an Iceberg table where to import the data
-   * @param spec a partition spec
-   * @param stagingDir a staging directory to store temporary manifest files
-   */
-  def importSparkPartitions(
-      spark: SparkSession,
-      partitions: Seq[SparkPartition],
-      targetTable: Table,
-      spec: PartitionSpec,
-      stagingDir: String): Unit = {
-
-    implicit val manifestFileEncoder: Encoder[ManifestFile] = 
Encoders.javaSerialization[ManifestFile]
-    implicit val dataFileEncoder: Encoder[DataFile] = 
Encoders.javaSerialization[DataFile]
-    implicit val pathDataFileEncoder: Encoder[(String, DataFile)] = 
Encoders.tuple(Encoders.STRING, dataFileEncoder)
-
-    import spark.implicits._
-
-    val conf = spark.sessionState.newHadoopConf()
-    val serializableConf = new SerializableConfiguration(conf)
-    val parallelism = Math.min(partitions.size, 
spark.sessionState.conf.parallelPartitionDiscoveryParallelism)
-    val partitionDS = spark.sparkContext.parallelize(partitions, 
parallelism).toDS()
-    val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
-    val metricsConfig = MetricsConfig.fromProperties(targetTable.properties)
-
-    val manifests = partitionDS
-      .flatMap(partition => listPartition(partition, spec, serializableConf, 
metricsConfig))
-      .repartition(numShufflePartitions)
-      .map(file => (file.path.toString, file))
-      .orderBy($"_1")
-      .mapPartitions(files => buildManifest(serializableConf, spec, 
stagingDir)(files.map(_._2)))
-      .collect()
-
-    try {
-      val snapshotIdInheritanceEnabled = PropertyUtil.propertyAsBoolean(
-        targetTable.properties,
-        TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
-        TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT)
-
-      val append = targetTable.newAppend()
-      manifests.foreach(manifest => append.appendManifest(manifest))
-      append.commit()
-
-      if (!snapshotIdInheritanceEnabled) {
-        // delete original manifests as they were rewritten before the commit
-        manifests.foreach(manifest => 
Try(targetTable.io.deleteFile(manifest.path)))
-      }
-    } catch {
-      case e: Throwable =>
-        // always clean up created manifests if the append fails
-        manifests.foreach(manifest => 
Try(targetTable.io.deleteFile(manifest.path)))
-        throw e;
-    }
-  }
-}
diff --git 
a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java 
b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
index 37f57d4..b73c036 100644
--- 
a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
+++ 
b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
@@ -52,7 +52,6 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import scala.collection.Seq;
 
 import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
 import static org.apache.iceberg.TableProperties.PARQUET_VECTORIZATION_ENABLED;
@@ -132,7 +131,7 @@ public class TestSparkTableUtil extends HiveTableBaseTest {
 
   @Test
   public void testPartitionScan() {
-    Seq<SparkPartition> partitions = SparkTableUtil.getPartitions(spark, 
qualifiedTableName);
+    List<SparkPartition> partitions = SparkTableUtil.getPartitions(spark, 
qualifiedTableName);
     Assert.assertEquals("There should be 3 partitions", 3, partitions.size());
 
     Dataset<Row> partitionDF = SparkTableUtil.partitionDF(spark, 
qualifiedTableName);
@@ -141,7 +140,7 @@ public class TestSparkTableUtil extends HiveTableBaseTest {
 
   @Test
   public void testPartitionScanByFilter() {
-    Seq<SparkPartition> partitions = 
SparkTableUtil.getPartitionsByFilter(spark, qualifiedTableName, "data = 'a'");
+    List<SparkPartition> partitions = 
SparkTableUtil.getPartitionsByFilter(spark, qualifiedTableName, "data = 'a'");
     Assert.assertEquals("There should be 1 matching partition", 1, 
partitions.size());
 
     Dataset<Row> partitionDF = SparkTableUtil.partitionDFByFilter(spark, 
qualifiedTableName, "data = 'a'");
diff --git 
a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtilWithInMemoryCatalog.java
 
b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtilWithInMemoryCatalog.java
index f3f7e53..2a94690 100644
--- 
a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtilWithInMemoryCatalog.java
+++ 
b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtilWithInMemoryCatalog.java
@@ -46,7 +46,6 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import scala.collection.Seq;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
 
@@ -213,7 +212,7 @@ public class TestSparkTableUtilWithInMemoryCatalog {
           .saveAsTable("parquet_table");
 
       File stagingDir = temp.newFolder("staging-dir");
-      Seq<SparkPartition> partitions = 
SparkTableUtil.getPartitionsByFilter(spark, "parquet_table", "data = 'a'");
+      List<SparkPartition> partitions = 
SparkTableUtil.getPartitionsByFilter(spark, "parquet_table", "data = 'a'");
       SparkTableUtil.importSparkPartitions(spark, partitions, table, 
table.spec(), stagingDir.toString());
 
       List<SimpleRecord> expectedRecords = Lists.newArrayList(new 
SimpleRecord(1, "a"));
@@ -258,7 +257,7 @@ public class TestSparkTableUtilWithInMemoryCatalog {
           .saveAsTable("parquet_table");
 
       File stagingDir = temp.newFolder("staging-dir");
-      Seq<SparkPartition> partitions = 
SparkTableUtil.getPartitionsByFilter(spark, "parquet_table", "data = 'a'");
+      List<SparkPartition> partitions = 
SparkTableUtil.getPartitionsByFilter(spark, "parquet_table", "data = 'a'");
       SparkTableUtil.importSparkPartitions(spark, partitions, table, 
table.spec(), stagingDir.toString());
 
       List<SimpleRecord> expectedRecords = Lists.newArrayList(new 
SimpleRecord(1, "a"));

Reply via email to