This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 42a1aeb Add ORC support for listPartitions (#210)
42a1aeb is described below
commit 42a1aeb19264aee7cef66293d69c54c3d0f0915c
Author: Edgar Rodriguez <[email protected]>
AuthorDate: Sat Jun 8 13:52:09 2019 -0700
Add ORC support for listPartitions (#210)
* Add ORC support for listPartitions
* Add OrcMetrics with row count only
---
.../java/org/apache/iceberg/orc/OrcMetrics.java | 60 ++++++++++++++++++++++
.../org/apache/iceberg/spark/SparkTableUtil.scala | 29 +++++++++++
2 files changed, 89 insertions(+)
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
new file mode 100644
index 0000000..2defc7d
--- /dev/null
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.orc;
+
+import java.io.IOException;
+import java.util.Collections;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.InputFile;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+
+public class OrcMetrics {
+
+ private OrcMetrics() {}
+
+ public static Metrics fromInputFile(InputFile file) {
+ final Configuration config = (file instanceof HadoopInputFile)
+ ? ((HadoopInputFile)file).getConf()
+ : new Configuration();
+ return fromInputFile(file, config);
+ }
+
+ public static Metrics fromInputFile(InputFile file, Configuration config) {
+ try {
+ final Reader orcReader = OrcFile.createReader(new Path(file.location()),
+ OrcFile.readerOptions(config));
+
+ // TODO: implement rest of the methods for ORC metrics
+ return new Metrics(orcReader.getNumberOfRows(),
+ null,
+ null,
+ Collections.emptyMap(),
+ null,
+ null);
+ } catch (IOException ioe) {
+ throw new RuntimeIOException(ioe, "Failed to read footer of file: %s",
file);
+ }
+ }
+}
diff --git a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
index 86dfbe3..9c95916 100644
--- a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
+++ b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
@@ -32,6 +32,9 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
import scala.collection.JavaConverters._
+import org.apache.iceberg.hadoop.HadoopInputFile
+import org.apache.iceberg.orc.OrcMetrics
+
object SparkTableUtil {
/**
* Returns a DataFrame with a row for each partition in the table.
@@ -73,6 +76,8 @@ object SparkTableUtil {
listAvroPartition(partition, uri)
} else if (format.contains("parquet")) {
listParquetPartition(partition, uri)
+ } else if (format.contains("orc")) {
+ listOrcPartition(partition, uri)
} else {
throw new UnsupportedOperationException(s"Unknown partition format:
$format")
}
@@ -248,5 +253,29 @@ object SparkTableUtil {
bytesMapToArray(metrics.upperBounds))
}
}
+
+ private def listOrcPartition(
+ partitionPath: Map[String, String],
+ partitionUri: String): Seq[SparkDataFile] = {
+ val conf = new Configuration()
+ val partition = new Path(partitionUri)
+ val fs = partition.getFileSystem(conf)
+
+ fs.listStatus(partition, HiddenPathFilter).filter(_.isFile).map { stat =>
+ val metrics =
OrcMetrics.fromInputFile(HadoopInputFile.fromPath(stat.getPath, conf))
+
+ SparkDataFile(
+ stat.getPath.toString,
+ partitionPath, "orc", stat.getLen,
+ stat.getBlockSize,
+ metrics.recordCount,
+ mapToArray(metrics.columnSizes),
+ mapToArray(metrics.valueCounts),
+ mapToArray(metrics.nullValueCounts),
+ bytesMapToArray(metrics.lowerBounds()),
+ bytesMapToArray(metrics.upperBounds())
+ )
+ }
+ }
}