This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new ca468d5 Fix NPE when counting entries (#1077)
ca468d5 is described below
commit ca468d569231e2cd5d502003586d5a49f53de470
Author: Xiang Li <[email protected]>
AuthorDate: Fri May 29 22:36:46 2020 +0800
Fix NPE when counting entries (#1077)
Closes #1077
---
api/src/main/java/org/apache/iceberg/Schema.java | 14 ++++++++++++-
.../java/org/apache/iceberg/AllEntriesTable.java | 4 +++-
.../org/apache/iceberg/ManifestEntriesTable.java | 4 +++-
.../spark/source/TestIcebergSourceTablesBase.java | 24 ++++++++++++++++++++++
4 files changed, 43 insertions(+), 3 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/Schema.java
b/api/src/main/java/org/apache/iceberg/Schema.java
index 86a89c0..0ecbfd4 100644
--- a/api/src/main/java/org/apache/iceberg/Schema.java
+++ b/api/src/main/java/org/apache/iceberg/Schema.java
@@ -124,9 +124,21 @@ public class Schema implements Serializable {
return struct.fields();
}
+ /**
+ * Returns the {@link Type} of a sub-field identified by the field name.
+ *
+ * @param name a field name
+ * @return a Type for the sub-field or null if it is not found
+ */
public Type findType(String name) {
Preconditions.checkArgument(!name.isEmpty(), "Invalid column name:
(empty)");
- return findType(lazyNameToId().get(name));
+ Integer id = lazyNameToId().get(name);
+ if (id != null) { // name is found
+ return findType(id);
+ }
+
+ // name could not be found
+ return null;
}
/**
diff --git a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
index bb19c41..1ab918d 100644
--- a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.ThreadPools;
@@ -100,7 +101,8 @@ public class AllEntriesTable extends BaseMetadataTable {
TableOperations ops, Snapshot snapshot, Expression rowFilter,
boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
CloseableIterable<ManifestFile> manifests =
allManifestFiles(ops.current().snapshots());
- Schema fileSchema = new
Schema(schema().findType("data_file").asStructType().fields());
+ Type fileProjection = schema().findType("data_file");
+ Schema fileSchema = fileProjection != null ? new
Schema(fileProjection.asStructType().fields()) : new Schema();
String schemaString = SchemaParser.toJson(schema());
String specString =
PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() :
rowFilter;
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
index e4288c0..3c43fc4 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
@@ -26,6 +26,7 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
/**
@@ -97,7 +98,8 @@ public class ManifestEntriesTable extends BaseMetadataTable {
boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
// return entries from both data and delete manifests
CloseableIterable<ManifestFile> manifests =
CloseableIterable.withNoopClose(snapshot.allManifests());
- Schema fileSchema = new
Schema(schema().findType("data_file").asStructType().fields());
+ Type fileProjection = schema().findType("data_file");
+ Schema fileSchema = fileProjection != null ? new
Schema(fileProjection.asStructType().fields()) : new Schema();
String schemaString = SchemaParser.toJson(schema());
String specString =
PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() :
rowFilter;
diff --git
a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index c8a72b5..def72d0 100644
---
a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -198,6 +198,30 @@ public abstract class TestIcebergSourceTablesBase extends
SparkTestBase {
}
@Test
+ public void testCountEntriesTable() {
+ TableIdentifier tableIdentifier = TableIdentifier.of("db",
"count_entries_test");
+ createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+
+ // init load
+ List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
+ Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
+ inputDf.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(loadLocation(tableIdentifier));
+
+ final int expectedEntryCount = 1;
+
+ // count entries
+ Assert.assertEquals("Count should return " + expectedEntryCount,
+ expectedEntryCount,
spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"entries")).count());
+
+ // count all_entries
+ Assert.assertEquals("Count should return " + expectedEntryCount,
+ expectedEntryCount,
spark.read().format("iceberg").load(loadLocation(tableIdentifier,
"all_entries")).count());
+ }
+
+ @Test
public void testFilesTable() throws Exception {
TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
Table table = createTable(tableIdentifier, SCHEMA,
PartitionSpec.builderFor(SCHEMA).identity("id").build());