This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 42a1aeb  Add ORC support for listPartitions (#210)
42a1aeb is described below

commit 42a1aeb19264aee7cef66293d69c54c3d0f0915c
Author: Edgar Rodriguez <[email protected]>
AuthorDate: Sat Jun 8 13:52:09 2019 -0700

    Add ORC support for listPartitions (#210)
    
    * Add ORC support for listPartitions
    * Add OrcMetrics with row count only
---
 .../java/org/apache/iceberg/orc/OrcMetrics.java    | 60 ++++++++++++++++++++++
 .../org/apache/iceberg/spark/SparkTableUtil.scala  | 29 +++++++++++
 2 files changed, 89 insertions(+)

diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java 
b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
new file mode 100644
index 0000000..2defc7d
--- /dev/null
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.orc;
+
+import java.io.IOException;
+import java.util.Collections;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.InputFile;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+
+public class OrcMetrics {
+
+  private OrcMetrics() {}
+
+  public static Metrics fromInputFile(InputFile file) {
+    final Configuration config = (file instanceof HadoopInputFile)
+        ? ((HadoopInputFile)file).getConf()
+        : new Configuration();
+    return fromInputFile(file, config);
+  }
+
+  public static Metrics fromInputFile(InputFile file, Configuration config) {
+    try {
+      final Reader orcReader = OrcFile.createReader(new Path(file.location()),
+          OrcFile.readerOptions(config));
+
+      // TODO: implement rest of the methods for ORC metrics
+      return new Metrics(orcReader.getNumberOfRows(),
+          null,
+          null,
+          Collections.emptyMap(),
+          null,
+          null);
+    } catch (IOException ioe) {
+      throw new RuntimeIOException(ioe, "Failed to read footer of file: %s", 
file);
+    }
+  }
+}
diff --git a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala 
b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
index 86dfbe3..9c95916 100644
--- a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
+++ b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
@@ -32,6 +32,9 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
 import scala.collection.JavaConverters._
 
+import org.apache.iceberg.hadoop.HadoopInputFile
+import org.apache.iceberg.orc.OrcMetrics
+
 object SparkTableUtil {
   /**
    * Returns a DataFrame with a row for each partition in the table.
@@ -73,6 +76,8 @@ object SparkTableUtil {
       listAvroPartition(partition, uri)
     } else if (format.contains("parquet")) {
       listParquetPartition(partition, uri)
+    } else if (format.contains("orc")) {
+      listOrcPartition(partition, uri)
     } else {
       throw new UnsupportedOperationException(s"Unknown partition format: 
$format")
     }
@@ -248,5 +253,29 @@ object SparkTableUtil {
         bytesMapToArray(metrics.upperBounds))
     }
   }
+
+  private def listOrcPartition(
+      partitionPath: Map[String, String],
+      partitionUri: String): Seq[SparkDataFile] = {
+    val conf = new Configuration()
+    val partition = new Path(partitionUri)
+    val fs = partition.getFileSystem(conf)
+
+    fs.listStatus(partition, HiddenPathFilter).filter(_.isFile).map { stat =>
+      val metrics = 
OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath, conf))
+
+      SparkDataFile(
+        stat.getPath.toString,
+        partitionPath, "orc", stat.getLen,
+        stat.getBlockSize,
+        metrics.recordCount,
+        mapToArray(metrics.columnSizes),
+        mapToArray(metrics.valueCounts),
+        mapToArray(metrics.nullValueCounts),
+        bytesMapToArray(metrics.lowerBounds()),
+        bytesMapToArray(metrics.upperBounds())
+      )
+    }
+  }
 }
 

Reply via email to