This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.9.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 768dc08a2ce558ea309215fa1c864ec0c48a4e72
Author: openinx <[email protected]>
AuthorDate: Fri Aug 7 23:58:24 2020 +0800

    ORC: Fix decimal and timestamp bugs (#1271)
---
 build.gradle                                       |   1 +
 .../apache/iceberg/data/orc/GenericOrcWriter.java  |  14 +-
 .../apache/iceberg/spark/data/SparkOrcReader.java  |   7 +-
 .../iceberg/spark/data/SparkOrcValueReaders.java   |  32 ++++-
 .../apache/iceberg/spark/data/SparkOrcWriter.java  |  16 ++-
 .../apache/iceberg/spark/data/GenericsHelpers.java |   2 +-
 .../spark/data/TestSparkRecordOrcReaderWriter.java | 148 +++++++++++++++++++++
 7 files changed, 199 insertions(+), 21 deletions(-)

diff --git a/build.gradle b/build.gradle
index 1879e80..4e8de83 100644
--- a/build.gradle
+++ b/build.gradle
@@ -451,6 +451,7 @@ project(':iceberg-spark') {
     }
     testCompile project(path: ':iceberg-hive', configuration: 'testArtifacts')
     testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
+    testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')
   }
 
   test {
diff --git 
a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java 
b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
index 1bf466d..0bc5b90 100644
--- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
+++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
@@ -363,8 +363,10 @@ public class GenericOrcWriter implements 
OrcValueWriter<Record> {
         output.isNull[rowId] = false;
         TimestampColumnVector cv = (TimestampColumnVector) output;
         cv.setIsUTC(true);
-        cv.time[rowId] = data.toInstant(ZoneOffset.UTC).toEpochMilli(); // 
millis
-        cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos 
to only keep microsecond precision
+        // millis
+        cv.time[rowId] = data.toInstant(ZoneOffset.UTC).toEpochMilli();
+        // truncate nanos to only keep microsecond precision
+        cv.nanos[rowId] = data.getNano() / 1_000 * 1_000;
       }
     }
   }
@@ -578,7 +580,13 @@ public class GenericOrcWriter implements 
OrcValueWriter<Record> {
       case VARCHAR:
         return new StringConverter();
       case DECIMAL:
-        return schema.getPrecision() <= 18 ? new Decimal18Converter(schema) : 
new Decimal38Converter(schema);
+        int precision = schema.getPrecision();
+        if (precision <= 18) {
+          return new Decimal18Converter(schema);
+        } else if (precision <= 38) {
+          return new Decimal38Converter(schema);
+        }
+        throw new IllegalArgumentException("Invalid precision: " + precision);
       case TIMESTAMP:
         return new TimestampConverter();
       case TIMESTAMP_INSTANT:
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java 
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
index da1ee6e..6e6eb56 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
@@ -32,7 +32,6 @@ import org.apache.orc.TypeDescription;
 import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.Decimal;
 
 /**
  * Converts the OrcIterator, which returns ORC's VectorizedRowBatch to a
@@ -103,11 +102,7 @@ public class SparkOrcReader implements 
OrcRowReader<InternalRow> {
         case TIMESTAMP_INSTANT:
           return SparkOrcValueReaders.timestampTzs();
         case DECIMAL:
-          if (primitive.getPrecision() <= Decimal.MAX_LONG_DIGITS()) {
-            return new 
SparkOrcValueReaders.Decimal18Reader(primitive.getPrecision(), 
primitive.getScale());
-          } else {
-            return new 
SparkOrcValueReaders.Decimal38Reader(primitive.getPrecision(), 
primitive.getScale());
-          }
+          return SparkOrcValueReaders.decimals(primitive.getPrecision(), 
primitive.getScale());
         case CHAR:
         case VARCHAR:
         case STRING:
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java 
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java
index 55e3117..94bb0cb 100644
--- 
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java
+++ 
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.orc.OrcValueReader;
 import org.apache.iceberg.orc.OrcValueReaders;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
@@ -55,6 +56,16 @@ class SparkOrcValueReaders {
     return TimestampTzReader.INSTANCE;
   }
 
+  public static OrcValueReader<Decimal> decimals(int precision, int scale) {
+    if (precision <= Decimal.MAX_LONG_DIGITS()) {
+      return new Decimal18Reader(precision, scale);
+    } else if (precision <= 38) {
+      return new Decimal38Reader(precision, scale);
+    } else {
+      throw new IllegalArgumentException("Invalid precision: " + precision);
+    }
+  }
+
   static OrcValueReader<?> struct(
       List<OrcValueReader<?>> readers, Types.StructType struct, Map<Integer, 
?> idToConstant) {
     return new StructReader(readers, struct, idToConstant);
@@ -159,13 +170,12 @@ class SparkOrcValueReaders {
 
     @Override
     public Long nonNullRead(ColumnVector vector, int row) {
-      TimestampColumnVector timestampVector = (TimestampColumnVector) vector;
-      return (timestampVector.time[row] / 1000) * 1_000_000 + 
timestampVector.nanos[row] / 1000;
+      TimestampColumnVector tcv = (TimestampColumnVector) vector;
+      return (Math.floorDiv(tcv.time[row], 1_000)) * 1_000_000 + 
Math.floorDiv(tcv.nanos[row], 1000);
     }
   }
 
-  static class Decimal18Reader implements OrcValueReader<Decimal> {
-    //TODO: these are being unused. check for bug
+  private static class Decimal18Reader implements OrcValueReader<Decimal> {
     private final int precision;
     private final int scale;
 
@@ -177,7 +187,15 @@ class SparkOrcValueReaders {
     @Override
     public Decimal nonNullRead(ColumnVector vector, int row) {
       HiveDecimalWritable value = ((DecimalColumnVector) vector).vector[row];
-      return new Decimal().set(value.serialize64(value.scale()), 
value.precision(), value.scale());
+
+      // The scale of decimal read from hive ORC file may be not equals to the 
expected scale. For data type
+      // decimal(10,3) and the value 10.100, the hive ORC writer will remove 
its trailing zero and store it
+      // as 101*10^(-1), its scale will adjust from 3 to 1. So here we could 
not assert that value.scale() == scale.
+      // we also need to convert the hive orc decimal to a decimal with 
expected precision and scale.
+      Preconditions.checkArgument(value.precision() <= precision,
+          "Cannot read value as decimal(%s,%s), too large: %s", precision, 
scale, value);
+
+      return new Decimal().set(value.serialize64(scale), precision, scale);
     }
   }
 
@@ -194,6 +212,10 @@ class SparkOrcValueReaders {
     public Decimal nonNullRead(ColumnVector vector, int row) {
       BigDecimal value = ((DecimalColumnVector) vector).vector[row]
           .getHiveDecimal().bigDecimalValue();
+
+      Preconditions.checkArgument(value.precision() <= precision,
+          "Cannot read value as decimal(%s,%s), too large: %s", precision, 
scale, value);
+
       return new Decimal().set(new scala.math.BigDecimal(value), precision, 
scale);
     }
   }
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java 
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
index c27f958..9234c98 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
@@ -214,9 +214,9 @@ public class SparkOrcWriter implements 
OrcValueWriter<InternalRow> {
       } else {
         output.isNull[rowId] = false;
         TimestampColumnVector cv = (TimestampColumnVector) output;
-        long micros = data.getLong(column);
-        cv.time[rowId] = micros / 1_000; // millis
-        cv.nanos[rowId] = (int) (micros % 1_000_000) * 1_000; // nanos
+        long micros = data.getLong(column); // it could be negative
+        cv.time[rowId] = Math.floorDiv(micros, 1_000); // millis
+        cv.nanos[rowId] = (int) Math.floorMod(micros, 1_000_000) * 1_000; // 
nanos
       }
     }
   }
@@ -388,9 +388,13 @@ public class SparkOrcWriter implements 
OrcValueWriter<InternalRow> {
       case VARCHAR:
         return new StringConverter();
       case DECIMAL:
-        return schema.getPrecision() <= 18 ?
-            new Decimal18Converter(schema) :
-            new Decimal38Converter(schema);
+        int precision = schema.getPrecision();
+        if (precision <= 18) {
+          return new Decimal18Converter(schema);
+        } else if (precision <= 38) {
+          return new Decimal38Converter(schema);
+        }
+        throw new IllegalArgumentException("Invalid precision: " + precision);
       case TIMESTAMP_INSTANT:
         return new TimestampTzConverter();
       case STRUCT:
diff --git 
a/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java 
b/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java
index 821f5bd..0c4598a 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java
@@ -237,7 +237,7 @@ public class GenericsHelpers {
         break;
       case DATE:
         Assert.assertTrue("Should expect a LocalDate", expected instanceof 
LocalDate);
-        long expectedDays = ChronoUnit.DAYS.between(EPOCH_DAY, (LocalDate) 
expected);
+        int expectedDays = (int) ChronoUnit.DAYS.between(EPOCH_DAY, 
(LocalDate) expected);
         Assert.assertEquals("Primitive value should be equal to expected", 
expectedDays, actual);
         break;
       case TIMESTAMP:
diff --git 
a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkRecordOrcReaderWriter.java
 
b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkRecordOrcReaderWriter.java
new file mode 100644
index 0000000..1e7430d
--- /dev/null
+++ 
b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkRecordOrcReaderWriter.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestSparkRecordOrcReaderWriter extends AvroDataTest {
+  private static final int NUM_RECORDS = 200;
+
+  private void writeAndValidate(Schema schema, List<Record> expectedRecords) 
throws IOException {
+    final File originalFile = temp.newFile();
+    Assert.assertTrue("Delete should succeed", originalFile.delete());
+
+    // Write few generic records into the original test file.
+    try (FileAppender<Record> writer = 
ORC.write(Files.localOutput(originalFile))
+        .createWriterFunc(GenericOrcWriter::buildWriter)
+        .schema(schema)
+        .build()) {
+      writer.addAll(expectedRecords);
+    }
+
+    // Read into spark InternalRow from the original test file.
+    List<InternalRow> internalRows = Lists.newArrayList();
+    try (CloseableIterable<InternalRow> reader = 
ORC.read(Files.localInput(originalFile))
+        .project(schema)
+        .createReaderFunc(readOrcSchema -> new SparkOrcReader(schema, 
readOrcSchema))
+        .build()) {
+      reader.forEach(internalRows::add);
+      assertEqualsUnsafe(schema.asStruct(), expectedRecords, reader, 
expectedRecords.size());
+    }
+
+    final File anotherFile = temp.newFile();
+    Assert.assertTrue("Delete should succeed", anotherFile.delete());
+
+    // Write those spark InternalRows into a new file again.
+    try (FileAppender<InternalRow> writer = 
ORC.write(Files.localOutput(anotherFile))
+        .createWriterFunc(SparkOrcWriter::new)
+        .schema(schema)
+        .build()) {
+      writer.addAll(internalRows);
+    }
+
+    // Check whether the InternalRows are expected records.
+    try (CloseableIterable<InternalRow> reader = 
ORC.read(Files.localInput(anotherFile))
+        .project(schema)
+        .createReaderFunc(readOrcSchema -> new SparkOrcReader(schema, 
readOrcSchema))
+        .build()) {
+      assertEqualsUnsafe(schema.asStruct(), expectedRecords, reader, 
expectedRecords.size());
+    }
+
+    // Read into iceberg GenericRecord and check again.
+    try (CloseableIterable<Record> reader = 
ORC.read(Files.localInput(anotherFile))
+        .createReaderFunc(typeDesc -> GenericOrcReader.buildReader(schema, 
typeDesc))
+        .project(schema)
+        .build()) {
+      assertRecordEquals(expectedRecords, reader, expectedRecords.size());
+    }
+  }
+
+  @Override
+  protected void writeAndValidate(Schema schema) throws IOException {
+    List<Record> expectedRecords = RandomGenericData.generate(schema, 
NUM_RECORDS, 1992L);
+    writeAndValidate(schema, expectedRecords);
+  }
+
+  @Test
+  public void testDecimalWithTrailingZero() throws IOException {
+    Schema schema = new Schema(
+        required(1, "d1", Types.DecimalType.of(10, 2)),
+        required(2, "d2", Types.DecimalType.of(20, 5)),
+        required(3, "d3", Types.DecimalType.of(38, 20))
+    );
+
+    List<Record> expected = Lists.newArrayList();
+
+    GenericRecord record = GenericRecord.create(schema);
+    record.set(0, new BigDecimal("101.00"));
+    record.set(1, new BigDecimal("10.00E-3"));
+    record.set(2, new BigDecimal("1001.0000E-16"));
+
+    expected.add(record.copy());
+
+    writeAndValidate(schema, expected);
+  }
+
+  private static void assertRecordEquals(Iterable<Record> expected, 
Iterable<Record> actual, int size) {
+    Iterator<Record> expectedIter = expected.iterator();
+    Iterator<Record> actualIter = actual.iterator();
+    for (int i = 0; i < size; i += 1) {
+      Assert.assertTrue("Expected iterator should have more rows", 
expectedIter.hasNext());
+      Assert.assertTrue("Actual iterator should have more rows", 
actualIter.hasNext());
+      Assert.assertEquals("Should have same rows.", expectedIter.next(), 
actualIter.next());
+    }
+    Assert.assertFalse("Expected iterator should not have any extra rows.", 
expectedIter.hasNext());
+    Assert.assertFalse("Actual iterator should not have any extra rows.", 
actualIter.hasNext());
+  }
+
+  private static void assertEqualsUnsafe(Types.StructType struct, 
Iterable<Record> expected,
+                                         Iterable<InternalRow> actual, int 
size) {
+    Iterator<Record> expectedIter = expected.iterator();
+    Iterator<InternalRow> actualIter = actual.iterator();
+    for (int i = 0; i < size; i += 1) {
+      Assert.assertTrue("Expected iterator should have more rows", 
expectedIter.hasNext());
+      Assert.assertTrue("Actual iterator should have more rows", 
actualIter.hasNext());
+      GenericsHelpers.assertEqualsUnsafe(struct, expectedIter.next(), 
actualIter.next());
+    }
+    Assert.assertFalse("Expected iterator should not have any extra rows.", 
expectedIter.hasNext());
+    Assert.assertFalse("Actual iterator should not have any extra rows.", 
actualIter.hasNext());
+  }
+}

Reply via email to