This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new e63722d24b5 [HUDI-7852] Constrain the comparison of different types of ordering values to limited cases (#11424) e63722d24b5 is described below commit e63722d24b5f42d5411f02c0b0872310111887b3 Author: Y Ethan Guo <ethan.guoyi...@gmail.com> AuthorDate: Mon Jun 10 04:50:44 2024 -0700 [HUDI-7852] Constrain the comparison of different types of ordering values to limited cases (#11424) --- .../hudi/BaseSparkInternalRowReaderContext.java | 10 +++ .../hudi/common/engine/HoodieReaderContext.java | 12 +++ .../read/HoodieBaseFileGroupRecordBuffer.java | 23 ++++-- .../table/read/TestHoodieFileGroupReaderBase.java | 90 +++++++++++++++------- 4 files changed, 99 insertions(+), 36 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java index 72063dd472e..7fb4577f896 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java @@ -39,6 +39,7 @@ import org.apache.spark.sql.HoodieUnsafeRowUtils; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.UnsafeRow; import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; import java.util.Map; import java.util.function.UnaryOperator; @@ -137,6 +138,15 @@ public abstract class BaseSparkInternalRowReaderContext extends HoodieReaderCont } + @Override + public int compareTo(Comparable o1, Comparable o2) { + if ((o1 instanceof String && o2 instanceof UTF8String) + || (o1 instanceof UTF8String && o2 instanceof String)) { + return o1.toString().compareTo(o2.toString()); + } + return super.compareTo(o1, o2); + } + protected UnaryOperator<InternalRow> getIdentityProjection() { return row -> row; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java index b79562f8b43..94b9e1cd02d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java @@ -250,6 +250,18 @@ public abstract class HoodieReaderContext<T> { */ public abstract T seal(T record); + /** + * Compares values in different types which can contain engine-specific types. + * + * @param o1 {@link Comparable} object. + * @param o2 other {@link Comparable} object to compare to. + * @return comparison result. + */ + public int compareTo(Comparable o1, Comparable o2) { + throw new IllegalArgumentException("Cannot compare values in different types: " + + o1 + "(" + o1.getClass() + "), " + o2 + "(" + o2.getClass() + ")"); + } + /** * Generates metadata map based on the information. * diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java index 984d9740ceb..c25fba0e928 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java @@ -157,26 +157,33 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> implements HoodieFileGr * Compares two {@link Comparable}s. If both are numbers, converts them to {@link Long} for comparison. * If one of the {@link Comparable}s is a String, assumes that both are String values for comparison. * + * @param readerContext {@link HoodieReaderContext} instance. * @param o1 {@link Comparable} object. * @param o2 other {@link Comparable} object to compare to. * @return comparison result. */ @VisibleForTesting - static int compareTo(Comparable o1, Comparable o2) { + static int compareTo(HoodieReaderContext readerContext, Comparable o1, Comparable o2) { // TODO(HUDI-7848): fix the delete records to contain the correct ordering value type // so this util with the number comparison is not necessary. try { return o1.compareTo(o2); } catch (ClassCastException e) { - if (o1 instanceof Number && o2 instanceof Number) { + boolean isO1LongOrInteger = (o1 instanceof Long || o1 instanceof Integer); + boolean isO2LongOrInteger = (o2 instanceof Long || o2 instanceof Integer); + boolean isO1DoubleOrFloat = (o1 instanceof Double || o1 instanceof Float); + boolean isO2DoubleOrFloat = (o2 instanceof Double || o2 instanceof Float); + if (isO1LongOrInteger && isO2LongOrInteger) { Long o1LongValue = ((Number) o1).longValue(); Long o2LongValue = ((Number) o2).longValue(); return o1LongValue.compareTo(o2LongValue); - } else if (o1 instanceof String || o2 instanceof String) { - return o1.toString().compareTo(o2.toString()); + } else if ((isO1LongOrInteger && isO2DoubleOrFloat) + || (isO1DoubleOrFloat && isO2LongOrInteger)) { + Double o1DoubleValue = ((Number) o1).doubleValue(); + Double o2DoubleValue = ((Number) o2).doubleValue(); + return o1DoubleValue.compareTo(o2DoubleValue); } else { - throw new IllegalArgumentException("Cannot compare values in different types: " - + o1 + "(" + o1.getClass() + "), " + o2 + "(" + o2.getClass() + ")"); + return readerContext.compareTo(o1, o2); } } catch (Throwable e) { throw new HoodieException("Cannot compare values: " @@ -236,7 +243,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> implements HoodieFileGr } Comparable incomingOrderingValue = readerContext.getOrderingValue( Option.of(record), metadata, readerSchema, payloadProps); - if (compareTo(incomingOrderingValue, existingOrderingValue) > 0) { + if (compareTo(readerContext, incomingOrderingValue, existingOrderingValue) > 0) { return Option.of(Pair.of(record, metadata)); } return Option.empty(); @@ -399,7 +406,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> implements HoodieFileGr if (isDeleteRecordWithNaturalOrder(newer, newOrderingValue)) { return Option.empty(); } - if (compareTo(oldOrderingValue, newOrderingValue) > 0) { + if (compareTo(readerContext, oldOrderingValue, newOrderingValue) > 0) { return older; } return newer; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java index 9f3f8acf81c..a8717d8a8e3 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java @@ -55,6 +55,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Stream; +import static org.apache.hudi.common.model.WriteOperationType.BULK_INSERT; import static org.apache.hudi.common.model.WriteOperationType.INSERT; import static org.apache.hudi.common.model.WriteOperationType.UPSERT; import static org.apache.hudi.common.table.HoodieTableConfig.PARTITION_FIELDS; @@ -64,6 +65,7 @@ import static org.apache.hudi.common.table.read.HoodieBaseFileGroupRecordBuffer. import static org.apache.hudi.common.testutils.HoodieTestUtils.getLogFileListFromFileSlice; import static org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.params.provider.Arguments.arguments; /** @@ -90,36 +92,68 @@ public abstract class TestHoodieFileGroupReaderBase<T> { public abstract Comparable getComparableUTF8String(String value); @Test - public void testCompareToComparable() { + public void testCompareToComparable() throws Exception { + Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs()); + // Prepare a table for initializing reader context + try (HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEEF)) { + commitToTable(recordsToStrings(dataGen.generateInserts("001", 1)), BULK_INSERT.value(), writeConfigs); + } + StorageConfiguration<?> storageConf = getStorageConf(); + String tablePath = getBasePath(); + HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(storageConf, tablePath); + Schema avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema(); + HoodieReaderContext<T> readerContext = getHoodieReaderContext(tablePath, avroSchema, storageConf); + // Test same type - assertEquals(1, compareTo(Boolean.TRUE, Boolean.FALSE)); - assertEquals(0, compareTo(Boolean.TRUE, Boolean.TRUE)); - assertEquals(-1, compareTo(Boolean.FALSE, Boolean.TRUE)); - assertEquals(1, compareTo(20, 15)); - assertEquals(0, compareTo(15, 15)); - assertEquals(-1, compareTo(10, 15)); - assertEquals(1, compareTo(1.1f, 1.0f)); - assertEquals(0, compareTo(1.0f, 1.0f)); - assertEquals(-1, compareTo(0.9f, 1.0f)); - assertEquals(1, compareTo(1.1, 1.0)); - assertEquals(0, compareTo(1.0, 1.0)); - assertEquals(-1, compareTo(0.9, 1.0)); - assertEquals(1, compareTo("value2", "value1")); - assertEquals(0, compareTo("value1", "value1")); - assertEquals(-1, compareTo("value1", "value2")); + assertEquals(1, compareTo(readerContext, Boolean.TRUE, Boolean.FALSE)); + assertEquals(0, compareTo(readerContext, Boolean.TRUE, Boolean.TRUE)); + assertEquals(-1, compareTo(readerContext, Boolean.FALSE, Boolean.TRUE)); + assertEquals(1, compareTo(readerContext, 20, 15)); + assertEquals(0, compareTo(readerContext, 15, 15)); + assertEquals(-1, compareTo(readerContext, 10, 15)); + assertEquals(1, compareTo(readerContext, 1.1f, 1.0f)); + assertEquals(0, compareTo(readerContext, 1.0f, 1.0f)); + assertEquals(-1, compareTo(readerContext, 0.9f, 1.0f)); + assertEquals(1, compareTo(readerContext, 1.1, 1.0)); + assertEquals(0, compareTo(readerContext, 1.0, 1.0)); + assertEquals(-1, compareTo(readerContext, 0.9, 1.0)); + assertEquals(1, compareTo(readerContext, 1.1, 1)); + assertEquals(-1, compareTo(readerContext, 0.9, 1)); + assertEquals(1, compareTo(readerContext, "value2", "value1")); + assertEquals(0, compareTo(readerContext, "value1", "value1")); + assertEquals(-1, compareTo(readerContext, "value1", "value2")); // Test different types which are comparable - assertEquals(1, compareTo(Long.MAX_VALUE / 2L, 10)); - assertEquals(1, compareTo(20, 10L)); - assertEquals(0, compareTo(10L, 10)); - assertEquals(0, compareTo(10, 10L)); - assertEquals(-1, compareTo(10, Long.MAX_VALUE)); - assertEquals(-1, compareTo(10L, 20)); - assertEquals(1, compareTo(getComparableUTF8String("value2"), "value1")); - assertEquals(1, compareTo("value2", getComparableUTF8String("value1"))); - assertEquals(0, compareTo(getComparableUTF8String("value1"), "value1")); - assertEquals(0, compareTo("value1", getComparableUTF8String("value1"))); - assertEquals(-1, compareTo(getComparableUTF8String("value1"), "value2")); - assertEquals(-1, compareTo("value1", getComparableUTF8String("value2"))); + assertEquals(1, compareTo(readerContext, Long.MAX_VALUE / 2L, 10)); + assertEquals(1, compareTo(readerContext, 20, 10L)); + assertEquals(0, compareTo(readerContext, 10L, 10)); + assertEquals(0, compareTo(readerContext, 10, 10L)); + assertEquals(-1, compareTo(readerContext, 10, Long.MAX_VALUE)); + assertEquals(-1, compareTo(readerContext, 10L, 20)); + assertEquals(1, compareTo(readerContext, 10.01f, 10)); + assertEquals(1, compareTo(readerContext, 10.01f, 10L)); + assertEquals(1, compareTo(readerContext, 10.01, 10)); + assertEquals(1, compareTo(readerContext, 10.01, 10L)); + assertEquals(1, compareTo(readerContext, 11L, 10.99f)); + assertEquals(1, compareTo(readerContext, 11, 10.99)); + // Throw exception if comparing Double with Float which have different precision + assertThrows(IllegalArgumentException.class, () -> compareTo(readerContext, 10.01f, 10.0)); + assertThrows(IllegalArgumentException.class, () -> compareTo(readerContext, 10.01, 10.0f)); + assertEquals(0, compareTo(readerContext, 10.0, 10L)); + assertEquals(0, compareTo(readerContext, 10.0f, 10L)); + assertEquals(0, compareTo(readerContext, 10.0, 10)); + assertEquals(0, compareTo(readerContext, 10.0f, 10)); + assertEquals(-1, compareTo(readerContext, 9.99f, 10)); + assertEquals(-1, compareTo(readerContext, 9.99f, 10L)); + assertEquals(-1, compareTo(readerContext, 9.99, 10)); + assertEquals(-1, compareTo(readerContext, 9.99, 10L)); + assertEquals(-1, compareTo(readerContext, 10L, 10.01f)); + assertEquals(-1, compareTo(readerContext, 10, 10.01)); + assertEquals(1, compareTo(readerContext, getComparableUTF8String("value2"), "value1")); + assertEquals(1, compareTo(readerContext, "value2", getComparableUTF8String("value1"))); + assertEquals(0, compareTo(readerContext, getComparableUTF8String("value1"), "value1")); + assertEquals(0, compareTo(readerContext, "value1", getComparableUTF8String("value1"))); + assertEquals(-1, compareTo(readerContext, getComparableUTF8String("value1"), "value2")); + assertEquals(-1, compareTo(readerContext, "value1", getComparableUTF8String("value2"))); } private static Stream<Arguments> testArguments() {