This is an automated email from the ASF dual-hosted git repository. blue pushed a commit to branch 0.9.x in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 768dc08a2ce558ea309215fa1c864ec0c48a4e72 Author: openinx <[email protected]> AuthorDate: Fri Aug 7 23:58:24 2020 +0800 ORC: Fix decimal and timestamp bugs (#1271) --- build.gradle | 1 + .../apache/iceberg/data/orc/GenericOrcWriter.java | 14 +- .../apache/iceberg/spark/data/SparkOrcReader.java | 7 +- .../iceberg/spark/data/SparkOrcValueReaders.java | 32 ++++- .../apache/iceberg/spark/data/SparkOrcWriter.java | 16 ++- .../apache/iceberg/spark/data/GenericsHelpers.java | 2 +- .../spark/data/TestSparkRecordOrcReaderWriter.java | 148 +++++++++++++++++++++ 7 files changed, 199 insertions(+), 21 deletions(-) diff --git a/build.gradle b/build.gradle index 1879e80..4e8de83 100644 --- a/build.gradle +++ b/build.gradle @@ -451,6 +451,7 @@ project(':iceberg-spark') { } testCompile project(path: ':iceberg-hive', configuration: 'testArtifacts') testCompile project(path: ':iceberg-api', configuration: 'testArtifacts') + testCompile project(path: ':iceberg-data', configuration: 'testArtifacts') } test { diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java index 1bf466d..0bc5b90 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java @@ -363,8 +363,10 @@ public class GenericOrcWriter implements OrcValueWriter<Record> { output.isNull[rowId] = false; TimestampColumnVector cv = (TimestampColumnVector) output; cv.setIsUTC(true); - cv.time[rowId] = data.toInstant(ZoneOffset.UTC).toEpochMilli(); // millis - cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos to only keep microsecond precision + // millis + cv.time[rowId] = data.toInstant(ZoneOffset.UTC).toEpochMilli(); + // truncate nanos to only keep microsecond precision + cv.nanos[rowId] = data.getNano() / 1_000 * 1_000; } } } @@ -578,7 +580,13 @@ public class GenericOrcWriter implements OrcValueWriter<Record> { case VARCHAR: return new StringConverter(); case DECIMAL: - return schema.getPrecision() <= 18 ? new Decimal18Converter(schema) : new Decimal38Converter(schema); + int precision = schema.getPrecision(); + if (precision <= 18) { + return new Decimal18Converter(schema); + } else if (precision <= 38) { + return new Decimal38Converter(schema); + } + throw new IllegalArgumentException("Invalid precision: " + precision); case TIMESTAMP: return new TimestampConverter(); case TIMESTAMP_INSTANT: diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java index da1ee6e..6e6eb56 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java @@ -32,7 +32,6 @@ import org.apache.orc.TypeDescription; import org.apache.orc.storage.ql.exec.vector.StructColumnVector; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.types.Decimal; /** * Converts the OrcIterator, which returns ORC's VectorizedRowBatch to a @@ -103,11 +102,7 @@ public class SparkOrcReader implements OrcRowReader<InternalRow> { case TIMESTAMP_INSTANT: return SparkOrcValueReaders.timestampTzs(); case DECIMAL: - if (primitive.getPrecision() <= Decimal.MAX_LONG_DIGITS()) { - return new SparkOrcValueReaders.Decimal18Reader(primitive.getPrecision(), primitive.getScale()); - } else { - return new SparkOrcValueReaders.Decimal38Reader(primitive.getPrecision(), primitive.getScale()); - } + return SparkOrcValueReaders.decimals(primitive.getPrecision(), primitive.getScale()); case CHAR: case VARCHAR: case STRING: diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java index 55e3117..94bb0cb 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import org.apache.iceberg.orc.OrcValueReader; import org.apache.iceberg.orc.OrcValueReaders; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; @@ -55,6 +56,16 @@ class SparkOrcValueReaders { return TimestampTzReader.INSTANCE; } + public static OrcValueReader<Decimal> decimals(int precision, int scale) { + if (precision <= Decimal.MAX_LONG_DIGITS()) { + return new Decimal18Reader(precision, scale); + } else if (precision <= 38) { + return new Decimal38Reader(precision, scale); + } else { + throw new IllegalArgumentException("Invalid precision: " + precision); + } + } + static OrcValueReader<?> struct( List<OrcValueReader<?>> readers, Types.StructType struct, Map<Integer, ?> idToConstant) { return new StructReader(readers, struct, idToConstant); @@ -159,13 +170,12 @@ class SparkOrcValueReaders { @Override public Long nonNullRead(ColumnVector vector, int row) { - TimestampColumnVector timestampVector = (TimestampColumnVector) vector; - return (timestampVector.time[row] / 1000) * 1_000_000 + timestampVector.nanos[row] / 1000; + TimestampColumnVector tcv = (TimestampColumnVector) vector; + return (Math.floorDiv(tcv.time[row], 1_000)) * 1_000_000 + Math.floorDiv(tcv.nanos[row], 1000); } } - static class Decimal18Reader implements OrcValueReader<Decimal> { - //TODO: these are being unused. check for bug + private static class Decimal18Reader implements OrcValueReader<Decimal> { private final int precision; private final int scale; @@ -177,7 +187,15 @@ class SparkOrcValueReaders { @Override public Decimal nonNullRead(ColumnVector vector, int row) { HiveDecimalWritable value = ((DecimalColumnVector) vector).vector[row]; - return new Decimal().set(value.serialize64(value.scale()), value.precision(), value.scale()); + + // The scale of decimal read from hive ORC file may be not equals to the expected scale. For data type + // decimal(10,3) and the value 10.100, the hive ORC writer will remove its trailing zero and store it + // as 101*10^(-1), its scale will adjust from 3 to 1. So here we could not assert that value.scale() == scale. + // we also need to convert the hive orc decimal to a decimal with expected precision and scale. + Preconditions.checkArgument(value.precision() <= precision, + "Cannot read value as decimal(%s,%s), too large: %s", precision, scale, value); + + return new Decimal().set(value.serialize64(scale), precision, scale); } } @@ -194,6 +212,10 @@ class SparkOrcValueReaders { public Decimal nonNullRead(ColumnVector vector, int row) { BigDecimal value = ((DecimalColumnVector) vector).vector[row] .getHiveDecimal().bigDecimalValue(); + + Preconditions.checkArgument(value.precision() <= precision, + "Cannot read value as decimal(%s,%s), too large: %s", precision, scale, value); + return new Decimal().set(new scala.math.BigDecimal(value), precision, scale); } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index c27f958..9234c98 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -214,9 +214,9 @@ public class SparkOrcWriter implements OrcValueWriter<InternalRow> { } else { output.isNull[rowId] = false; TimestampColumnVector cv = (TimestampColumnVector) output; - long micros = data.getLong(column); - cv.time[rowId] = micros / 1_000; // millis - cv.nanos[rowId] = (int) (micros % 1_000_000) * 1_000; // nanos + long micros = data.getLong(column); // it could be negative + cv.time[rowId] = Math.floorDiv(micros, 1_000); // millis + cv.nanos[rowId] = (int) Math.floorMod(micros, 1_000_000) * 1_000; // nanos } } } @@ -388,9 +388,13 @@ public class SparkOrcWriter implements OrcValueWriter<InternalRow> { case VARCHAR: return new StringConverter(); case DECIMAL: - return schema.getPrecision() <= 18 ? - new Decimal18Converter(schema) : - new Decimal38Converter(schema); + int precision = schema.getPrecision(); + if (precision <= 18) { + return new Decimal18Converter(schema); + } else if (precision <= 38) { + return new Decimal38Converter(schema); + } + throw new IllegalArgumentException("Invalid precision: " + precision); case TIMESTAMP_INSTANT: return new TimestampTzConverter(); case STRUCT: diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java b/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java index 821f5bd..0c4598a 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java @@ -237,7 +237,7 @@ public class GenericsHelpers { break; case DATE: Assert.assertTrue("Should expect a LocalDate", expected instanceof LocalDate); - long expectedDays = ChronoUnit.DAYS.between(EPOCH_DAY, (LocalDate) expected); + int expectedDays = (int) ChronoUnit.DAYS.between(EPOCH_DAY, (LocalDate) expected); Assert.assertEquals("Primitive value should be equal to expected", expectedDays, actual); break; case TIMESTAMP: diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkRecordOrcReaderWriter.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkRecordOrcReaderWriter.java new file mode 100644 index 0000000..1e7430d --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkRecordOrcReaderWriter.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data; + +import java.io.File; +import java.io.IOException; +import java.math.BigDecimal; +import java.util.Iterator; +import java.util.List; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.orc.GenericOrcReader; +import org.apache.iceberg.data.orc.GenericOrcWriter; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.InternalRow; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestSparkRecordOrcReaderWriter extends AvroDataTest { + private static final int NUM_RECORDS = 200; + + private void writeAndValidate(Schema schema, List<Record> expectedRecords) throws IOException { + final File originalFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", originalFile.delete()); + + // Write few generic records into the original test file. + try (FileAppender<Record> writer = ORC.write(Files.localOutput(originalFile)) + .createWriterFunc(GenericOrcWriter::buildWriter) + .schema(schema) + .build()) { + writer.addAll(expectedRecords); + } + + // Read into spark InternalRow from the original test file. + List<InternalRow> internalRows = Lists.newArrayList(); + try (CloseableIterable<InternalRow> reader = ORC.read(Files.localInput(originalFile)) + .project(schema) + .createReaderFunc(readOrcSchema -> new SparkOrcReader(schema, readOrcSchema)) + .build()) { + reader.forEach(internalRows::add); + assertEqualsUnsafe(schema.asStruct(), expectedRecords, reader, expectedRecords.size()); + } + + final File anotherFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", anotherFile.delete()); + + // Write those spark InternalRows into a new file again. + try (FileAppender<InternalRow> writer = ORC.write(Files.localOutput(anotherFile)) + .createWriterFunc(SparkOrcWriter::new) + .schema(schema) + .build()) { + writer.addAll(internalRows); + } + + // Check whether the InternalRows are expected records. + try (CloseableIterable<InternalRow> reader = ORC.read(Files.localInput(anotherFile)) + .project(schema) + .createReaderFunc(readOrcSchema -> new SparkOrcReader(schema, readOrcSchema)) + .build()) { + assertEqualsUnsafe(schema.asStruct(), expectedRecords, reader, expectedRecords.size()); + } + + // Read into iceberg GenericRecord and check again. + try (CloseableIterable<Record> reader = ORC.read(Files.localInput(anotherFile)) + .createReaderFunc(typeDesc -> GenericOrcReader.buildReader(schema, typeDesc)) + .project(schema) + .build()) { + assertRecordEquals(expectedRecords, reader, expectedRecords.size()); + } + } + + @Override + protected void writeAndValidate(Schema schema) throws IOException { + List<Record> expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1992L); + writeAndValidate(schema, expectedRecords); + } + + @Test + public void testDecimalWithTrailingZero() throws IOException { + Schema schema = new Schema( + required(1, "d1", Types.DecimalType.of(10, 2)), + required(2, "d2", Types.DecimalType.of(20, 5)), + required(3, "d3", Types.DecimalType.of(38, 20)) + ); + + List<Record> expected = Lists.newArrayList(); + + GenericRecord record = GenericRecord.create(schema); + record.set(0, new BigDecimal("101.00")); + record.set(1, new BigDecimal("10.00E-3")); + record.set(2, new BigDecimal("1001.0000E-16")); + + expected.add(record.copy()); + + writeAndValidate(schema, expected); + } + + private static void assertRecordEquals(Iterable<Record> expected, Iterable<Record> actual, int size) { + Iterator<Record> expectedIter = expected.iterator(); + Iterator<Record> actualIter = actual.iterator(); + for (int i = 0; i < size; i += 1) { + Assert.assertTrue("Expected iterator should have more rows", expectedIter.hasNext()); + Assert.assertTrue("Actual iterator should have more rows", actualIter.hasNext()); + Assert.assertEquals("Should have same rows.", expectedIter.next(), actualIter.next()); + } + Assert.assertFalse("Expected iterator should not have any extra rows.", expectedIter.hasNext()); + Assert.assertFalse("Actual iterator should not have any extra rows.", actualIter.hasNext()); + } + + private static void assertEqualsUnsafe(Types.StructType struct, Iterable<Record> expected, + Iterable<InternalRow> actual, int size) { + Iterator<Record> expectedIter = expected.iterator(); + Iterator<InternalRow> actualIter = actual.iterator(); + for (int i = 0; i < size; i += 1) { + Assert.assertTrue("Expected iterator should have more rows", expectedIter.hasNext()); + Assert.assertTrue("Actual iterator should have more rows", actualIter.hasNext()); + GenericsHelpers.assertEqualsUnsafe(struct, expectedIter.next(), actualIter.next()); + } + Assert.assertFalse("Expected iterator should not have any extra rows.", expectedIter.hasNext()); + Assert.assertFalse("Actual iterator should not have any extra rows.", actualIter.hasNext()); + } +}
