This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 780a2ac [HUDI-2102] Support hilbert curve for hudi (#3952) 780a2ac is described below commit 780a2ac5b208b06f53ee64186b5392ebc37eadbc Author: xiarixiaoyao <mengtao0...@qq.com> AuthorDate: Sat Nov 27 15:20:19 2021 +0800 [HUDI-2102] Support hilbert curve for hudi (#3952) Co-authored-by: Y Ethan Guo <ethan.guoyi...@gmail.com> --- NOTICE | 6 + hudi-client/hudi-client-common/pom.xml | 7 + .../apache/hudi/config/HoodieClusteringConfig.java | 28 ++ .../apache/hudi/optimize/HilbertCurveUtils.java | 52 +++ .../org/apache/hudi/optimize/ZOrderingUtil.java | 7 +- .../hudi/optimize/TestHilbertCurveUtils.java | 38 ++ .../apache/hudi/optimize/TestZOrderingUtil.java | 25 ++ ...RDDSpatialCurveOptimizationSortPartitioner.java | 8 +- .../hudi/index/zorder/ZOrderingIndexHelper.java | 17 +- .../hudi/table/HoodieSparkCopyOnWriteTable.java | 8 +- .../java/org/apache/spark/OrderingIndexHelper.java | 430 +++++++++++++++++++++ .../spark/sql/hudi/execution/RangeSample.scala | 45 ++- .../functional/TestTableLayoutOptimization.scala | 248 ++++++++++++ .../benchmark/SpaceCurveOptimizeBenchMark.scala | 118 ++++++ packaging/hudi-flink-bundle/pom.xml | 2 + packaging/hudi-kafka-connect-bundle/pom.xml | 2 + packaging/hudi-spark-bundle/pom.xml | 2 + packaging/hudi-utilities-bundle/pom.xml | 2 + 18 files changed, 1015 insertions(+), 30 deletions(-) diff --git a/NOTICE b/NOTICE index 9b24933..437b974 100644 --- a/NOTICE +++ b/NOTICE @@ -159,3 +159,9 @@ its NOTICE file: This product includes software developed at StreamSets (http://www.streamsets.com/). +-------------------------------------------------------------------------------- + +This product includes code from hilbert-curve project + * Copyright https://github.com/davidmoten/hilbert-curve + * Licensed under the Apache-2.0 License + diff --git a/hudi-client/hudi-client-common/pom.xml b/hudi-client/hudi-client-common/pom.xml index d30ee29..22ad8ec 100644 --- a/hudi-client/hudi-client-common/pom.xml +++ b/hudi-client/hudi-client-common/pom.xml @@ -64,6 +64,13 @@ <artifactId>parquet-avro</artifactId> </dependency> + <!-- Hilbert Curve --> + <dependency> + <groupId>com.github.davidmoten</groupId> + <artifactId>hilbert-curve</artifactId> + <version>0.2.2</version> + </dependency> + <!-- Dropwizard Metrics --> <dependency> <groupId>io.dropwizard.metrics</groupId> diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index 9a10965..676f2ff 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -542,4 +542,32 @@ public class HoodieClusteringConfig extends HoodieConfig { } } } + + /** + * strategy types for optimize layout for hudi data. + */ + public enum BuildLayoutOptimizationStrategy { + ZORDER("z-order"), + HILBERT("hilbert"); + private final String value; + + BuildLayoutOptimizationStrategy(String value) { + this.value = value; + } + + public String toCustomString() { + return value; + } + + public static BuildLayoutOptimizationStrategy fromValue(String value) { + switch (value.toLowerCase(Locale.ROOT)) { + case "z-order": + return ZORDER; + case "hilbert": + return HILBERT; + default: + throw new HoodieException("Invalid value of Type."); + } + } + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java new file mode 100644 index 0000000..0f216ab --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.optimize; + +import org.davidmoten.hilbert.HilbertCurve; + +import java.math.BigInteger; + +/** + * Utils for Hilbert Curve. + */ +public class HilbertCurveUtils { + public static byte[] indexBytes(HilbertCurve hilbertCurve, long[] points, int paddingNum) { + BigInteger index = hilbertCurve.index(points); + return paddingToNByte(index.toByteArray(), paddingNum); + } + + public static byte[] paddingToNByte(byte[] a, int paddingNum) { + if (a.length == paddingNum) { + return a; + } + if (a.length > paddingNum) { + byte[] result = new byte[paddingNum]; + System.arraycopy(a, 0, result, 0, paddingNum); + return result; + } + int paddingSize = paddingNum - a.length; + byte[] result = new byte[paddingNum]; + for (int i = 0; i < paddingSize; i++) { + result[i] = 0; + } + System.arraycopy(a, 0, result, paddingSize, a.length); + return result; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java index 3aa8080..50827cc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java @@ -176,9 +176,14 @@ public class ZOrderingUtil { public static Long convertStringToLong(String a) { byte[] bytes = utf8To8Byte(a); + return convertBytesToLong(bytes); + } + + public static long convertBytesToLong(byte[] bytes) { + byte[] paddedBytes = paddingTo8Byte(bytes); long temp = 0L; for (int i = 7; i >= 0; i--) { - temp = temp | (((long)bytes[i] & 0xff) << (7 - i) * 8); + temp = temp | (((long) paddedBytes[i] & 0xff) << (7 - i) * 8); } return temp; } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestHilbertCurveUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestHilbertCurveUtils.java new file mode 100644 index 0000000..5bb482e --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestHilbertCurveUtils.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.optimize; + +import org.davidmoten.hilbert.HilbertCurve; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestHilbertCurveUtils { + + private static final HilbertCurve INSTANCE = HilbertCurve.bits(5).dimensions(2); + + @Test + public void testIndex() { + long[] t = {1, 2}; + assertEquals(13, INSTANCE.index(t).intValue()); + long[] t1 = {0, 16}; + assertEquals(256, INSTANCE.index(t1).intValue()); + } +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java index 7dab6c2..a22485f 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java @@ -126,4 +126,29 @@ public class TestZOrderingUtil { this.originValue = originValue; } } + + @Test + public void testConvertBytesToLong() { + long[] tests = new long[] {Long.MIN_VALUE, -1L, 0, 1L, Long.MAX_VALUE}; + for (int i = 0; i < tests.length; i++) { + assertEquals(ZOrderingUtil.convertBytesToLong(convertLongToBytes(tests[i])), tests[i]); + } + } + + @Test + public void testConvertBytesToLongWithPadding() { + byte[] bytes = new byte[2]; + bytes[0] = 2; + bytes[1] = 127; + assertEquals(ZOrderingUtil.convertBytesToLong(bytes), 2 * 256 + 127); + } + + private byte[] convertLongToBytes(long num) { + byte[] byteNum = new byte[8]; + for (int i = 0; i < 8; i++) { + int offset = 64 - (i + 1) * 8; + byteNum[i] = (byte) ((num >> offset) & 0xff); + } + return byteNum; + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java index 03fdf5a..51526fc 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java @@ -33,7 +33,7 @@ import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import org.apache.hudi.index.zorder.ZOrderingIndexHelper; +import org.apache.spark.OrderingIndexHelper; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -79,10 +79,12 @@ public class RDDSpatialCurveOptimizationSortPartitioner<T extends HoodieRecordPa switch (config.getLayoutOptimizationCurveBuildMethod()) { case DIRECT: - zDataFrame = ZOrderingIndexHelper.createZIndexedDataFrameByMapValue(originDF, config.getClusteringSortColumns(), numOutputGroups); + zDataFrame = OrderingIndexHelper + .createOptimizedDataFrameByMapValue(originDF, config.getClusteringSortColumns(), numOutputGroups, config.getLayoutOptimizationStrategy()); break; case SAMPLE: - zDataFrame = ZOrderingIndexHelper.createZIndexedDataFrameBySample(originDF, config.getClusteringSortColumns(), numOutputGroups); + zDataFrame = OrderingIndexHelper + .createOptimizeDataFrameBySample(originDF, config.getClusteringSortColumns(), numOutputGroups, config.getLayoutOptimizationStrategy()); break; default: throw new HoodieException("Not a valid build curve method for doWriteOperation: "); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java index 248c15c..934d1b9 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java @@ -18,17 +18,19 @@ package org.apache.hudi.index.zorder; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.optimize.ZOrderingUtil; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.io.api.Binary; @@ -62,10 +64,10 @@ import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.StructType$; import org.apache.spark.sql.types.TimestampType; import org.apache.spark.util.SerializableConfiguration; -import scala.collection.JavaConversions; import javax.annotation.Nonnull; import javax.annotation.Nullable; + import java.io.IOException; import java.math.BigDecimal; import java.util.ArrayList; @@ -77,6 +79,8 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import scala.collection.JavaConversions; + import static org.apache.hudi.util.DataTypeUtils.areCompatible; public class ZOrderingIndexHelper { @@ -189,7 +193,8 @@ public class ZOrderingIndexHelper { } public static Dataset<Row> createZIndexedDataFrameBySample(Dataset<Row> df, List<String> zCols, int fileNum) { - return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, JavaConversions.asScalaBuffer(zCols), fileNum); + return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, JavaConversions.asScalaBuffer(zCols), fileNum, + HoodieClusteringConfig.BuildLayoutOptimizationStrategy.ZORDER.toCustomString()); } public static Dataset<Row> createZIndexedDataFrameBySample(Dataset<Row> df, String zCols, int fileNum) { @@ -584,7 +589,7 @@ public class ZOrderingIndexHelper { * @VisibleForTesting */ @Nonnull - static String createIndexMergeSql( + public static String createIndexMergeSql( @Nonnull String originalIndexTable, @Nonnull String newIndexTable, @Nonnull List<String> columns diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index 280d24f..7d2fbd3 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -18,8 +18,6 @@ package org.apache.hudi.table; -import org.apache.avro.Schema; -import org.apache.hadoop.fs.Path; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCleanMetadata; @@ -49,6 +47,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.HoodieUpsertException; +import org.apache.hudi.index.zorder.ZOrderingIndexHelper; import org.apache.hudi.io.HoodieCreateHandle; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.HoodieSortedMergeHandle; @@ -76,12 +75,15 @@ import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor; import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor; import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; import org.apache.hudi.table.action.savepoint.SavepointActionExecutor; + +import org.apache.avro.Schema; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.hudi.index.zorder.ZOrderingIndexHelper; import org.apache.spark.api.java.JavaRDD; import javax.annotation.Nonnull; + import java.io.IOException; import java.util.Arrays; import java.util.Collections; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java new file mode 100644 index 0000000..67b1c67 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieColumnRangeMetadata; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ParquetUtils; +import org.apache.hudi.config.HoodieClusteringConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.index.zorder.ZOrderingIndexHelper; +import org.apache.hudi.optimize.HilbertCurveUtils; +import org.apache.hudi.optimize.ZOrderingUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.io.api.Binary; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.Row$; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.hudi.execution.RangeSampleSort$; +import org.apache.spark.sql.hudi.execution.ZorderingBinarySort; +import org.apache.spark.sql.types.BinaryType; +import org.apache.spark.sql.types.BinaryType$; +import org.apache.spark.sql.types.BooleanType; +import org.apache.spark.sql.types.ByteType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DateType; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.DoubleType; +import org.apache.spark.sql.types.FloatType; +import org.apache.spark.sql.types.IntegerType; +import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.types.LongType$; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.ShortType; +import org.apache.spark.sql.types.StringType; +import org.apache.spark.sql.types.StringType$; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType$; +import org.apache.spark.sql.types.TimestampType; +import org.apache.spark.util.SerializableConfiguration; +import org.davidmoten.hilbert.HilbertCurve; + +import java.io.IOException; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import scala.collection.JavaConversions; + +public class OrderingIndexHelper { + + private static final String SPARK_JOB_DESCRIPTION = "spark.job.description"; + + /** + * Create optimized DataFrame directly + * only support base type data. long,int,short,double,float,string,timestamp,decimal,date,byte + * this method is more effective than createOptimizeDataFrameBySample + * + * @param df a spark DataFrame holds parquet files to be read. + * @param sortCols ordering columns for the curve + * @param fileNum spark partition num + * @param sortMode layout optimization strategy + * @return a dataFrame ordered by the curve. + */ + public static Dataset<Row> createOptimizedDataFrameByMapValue(Dataset<Row> df, List<String> sortCols, int fileNum, String sortMode) { + Map<String, StructField> columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e)); + int fieldNum = df.schema().fields().length; + List<String> checkCols = sortCols.stream().filter(f -> columnsMap.containsKey(f)).collect(Collectors.toList()); + if (sortCols.size() != checkCols.size()) { + return df; + } + // only one col to sort, no need to use z-order + if (sortCols.size() == 1) { + return df.repartitionByRange(fieldNum, org.apache.spark.sql.functions.col(sortCols.get(0))); + } + Map<Integer, StructField> fieldMap = sortCols + .stream().collect(Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), e -> columnsMap.get(e))); + // do optimize + JavaRDD<Row> sortedRDD = null; + switch (HoodieClusteringConfig.BuildLayoutOptimizationStrategy.fromValue(sortMode)) { + case ZORDER: + sortedRDD = createZCurveSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, fileNum); + break; + case HILBERT: + sortedRDD = createHilbertSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, fileNum); + break; + default: + throw new IllegalArgumentException(String.format("new only support z-order/hilbert optimize but find: %s", sortMode)); + } + // create new StructType + List<StructField> newFields = new ArrayList<>(); + newFields.addAll(Arrays.asList(df.schema().fields())); + newFields.add(new StructField("Index", BinaryType$.MODULE$, true, Metadata.empty())); + + // create new DataFrame + return df.sparkSession().createDataFrame(sortedRDD, StructType$.MODULE$.apply(newFields)).drop("Index"); + } + + private static JavaRDD<Row> createZCurveSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) { + return originRDD.map(row -> { + List<byte[]> zBytesList = fieldMap.entrySet().stream().map(entry -> { + int index = entry.getKey(); + StructField field = entry.getValue(); + DataType dataType = field.dataType(); + if (dataType instanceof LongType) { + return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index)); + } else if (dataType instanceof DoubleType) { + return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index)); + } else if (dataType instanceof IntegerType) { + return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index)); + } else if (dataType instanceof FloatType) { + return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index)); + } else if (dataType instanceof StringType) { + return ZOrderingUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index)); + } else if (dataType instanceof DateType) { + return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime()); + } else if (dataType instanceof TimestampType) { + return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime()); + } else if (dataType instanceof ByteType) { + return ZOrderingUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index)); + } else if (dataType instanceof ShortType) { + return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index)); + } else if (dataType instanceof DecimalType) { + return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue()); + } else if (dataType instanceof BooleanType) { + boolean value = row.isNullAt(index) ? false : row.getBoolean(index); + return ZOrderingUtil.intTo8Byte(value ? 1 : 0); + } else if (dataType instanceof BinaryType) { + return ZOrderingUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index)); + } + return null; + }).filter(f -> f != null).collect(Collectors.toList()); + byte[][] zBytes = new byte[zBytesList.size()][]; + for (int i = 0; i < zBytesList.size(); i++) { + zBytes[i] = zBytesList.get(i); + } + List<Object> zVaules = new ArrayList<>(); + zVaules.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava()); + zVaules.add(ZOrderingUtil.interleaving(zBytes, 8)); + return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(zVaules)); + }).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true, fileNum); + } + + private static JavaRDD<Row> createHilbertSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) { + return originRDD.mapPartitions(rows -> { + HilbertCurve hilbertCurve = HilbertCurve.bits(63).dimensions(fieldMap.size()); + return new Iterator<Row>() { + + @Override + public boolean hasNext() { + return rows.hasNext(); + } + + @Override + public Row next() { + Row row = rows.next(); + List<Long> longList = fieldMap.entrySet().stream().map(entry -> { + int index = entry.getKey(); + StructField field = entry.getValue(); + DataType dataType = field.dataType(); + if (dataType instanceof LongType) { + return row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index); + } else if (dataType instanceof DoubleType) { + return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits(row.getDouble(index)); + } else if (dataType instanceof IntegerType) { + return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getInt(index); + } else if (dataType instanceof FloatType) { + return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits((double) row.getFloat(index)); + } else if (dataType instanceof StringType) { + return row.isNullAt(index) ? Long.MAX_VALUE : ZOrderingUtil.convertStringToLong(row.getString(index)); + } else if (dataType instanceof DateType) { + return row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime(); + } else if (dataType instanceof TimestampType) { + return row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime(); + } else if (dataType instanceof ByteType) { + return row.isNullAt(index) ? Long.MAX_VALUE : ZOrderingUtil.convertBytesToLong(new byte[] {row.getByte(index)}); + } else if (dataType instanceof ShortType) { + return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getShort(index); + } else if (dataType instanceof DecimalType) { + return row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue(); + } else if (dataType instanceof BooleanType) { + boolean value = row.isNullAt(index) ? false : row.getBoolean(index); + return value ? Long.MAX_VALUE : 0; + } else if (dataType instanceof BinaryType) { + return row.isNullAt(index) ? Long.MAX_VALUE : ZOrderingUtil.convertBytesToLong((byte[]) row.get(index)); + } + return null; + }).filter(f -> f != null).collect(Collectors.toList()); + + byte[] hilbertValue = HilbertCurveUtils.indexBytes( + hilbertCurve, longList.stream().mapToLong(l -> l).toArray(), 63); + List<Object> values = new ArrayList<>(); + values.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava()); + values.add(hilbertValue); + return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(values)); + } + }; + }).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true, fileNum); + } + + public static Dataset<Row> createOptimizedDataFrameByMapValue(Dataset<Row> df, String sortCols, int fileNum, String sortMode) { + if (sortCols == null || sortCols.isEmpty() || fileNum <= 0) { + return df; + } + return createOptimizedDataFrameByMapValue(df, + Arrays.stream(sortCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum, sortMode); + } + + public static Dataset<Row> createOptimizeDataFrameBySample(Dataset<Row> df, List<String> zCols, int fileNum, String sortMode) { + return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, JavaConversions.asScalaBuffer(zCols), fileNum, sortMode); + } + + public static Dataset<Row> createOptimizeDataFrameBySample(Dataset<Row> df, String zCols, int fileNum, String sortMode) { + if (zCols == null || zCols.isEmpty() || fileNum <= 0) { + return df; + } + return createOptimizeDataFrameBySample(df, Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum, sortMode); + } + + /** + * Parse min/max statistics stored in parquet footers for z-sort cols. + * no support collect statistics from timeStampType, since parquet file has not collect the statistics for timeStampType. + * to do adapt for rfc-27 + * + * @param df a spark DataFrame holds parquet files to be read. + * @param cols z-sort cols + * @return a dataFrame holds all statistics info. + */ + public static Dataset<Row> getMinMaxValue(Dataset<Row> df, List<String> cols) { + Map<String, DataType> columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e.dataType())); + + List<String> scanFiles = Arrays.asList(df.inputFiles()); + SparkContext sc = df.sparkSession().sparkContext(); + JavaSparkContext jsc = new JavaSparkContext(sc); + + SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sc.hadoopConfiguration()); + int numParallelism = (scanFiles.size() / 3 + 1); + List<HoodieColumnRangeMetadata<Comparable>> colMinMaxInfos; + String previousJobDescription = sc.getLocalProperty(SPARK_JOB_DESCRIPTION); + try { + jsc.setJobDescription("Listing parquet column statistics"); + colMinMaxInfos = jsc.parallelize(scanFiles, numParallelism).mapPartitions(paths -> { + Configuration conf = serializableConfiguration.value(); + ParquetUtils parquetUtils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET); + List<Collection<HoodieColumnRangeMetadata<Comparable>>> results = new ArrayList<>(); + while (paths.hasNext()) { + String path = paths.next(); + results.add(parquetUtils.readRangeFromParquetMetadata(conf, new Path(path), cols)); + } + return results.stream().flatMap(f -> f.stream()).iterator(); + }).collect(); + } finally { + jsc.setJobDescription(previousJobDescription); + } + + Map<String, List<HoodieColumnRangeMetadata<Comparable>>> fileToStatsListMap = colMinMaxInfos.stream().collect(Collectors.groupingBy(e -> e.getFilePath())); + JavaRDD<Row> allMetaDataRDD = jsc.parallelize(new ArrayList<>(fileToStatsListMap.values()), 1).map(f -> { + int colSize = f.size(); + if (colSize == 0) { + return null; + } else { + List<Object> rows = new ArrayList<>(); + rows.add(f.get(0).getFilePath()); + cols.stream().forEach(col -> { + HoodieColumnRangeMetadata<Comparable> currentColRangeMetaData = + f.stream().filter(s -> s.getColumnName().trim().equalsIgnoreCase(col)).findFirst().orElse(null); + DataType colType = columnsMap.get(col); + if (currentColRangeMetaData == null || colType == null) { + throw new HoodieException(String.format("cannot collect min/max statistics for col: %s", col)); + } + if (colType instanceof IntegerType) { + rows.add(currentColRangeMetaData.getMinValue()); + rows.add(currentColRangeMetaData.getMaxValue()); + } else if (colType instanceof DoubleType) { + rows.add(currentColRangeMetaData.getMinValue()); + rows.add(currentColRangeMetaData.getMaxValue()); + } else if (colType instanceof StringType) { + rows.add(currentColRangeMetaData.getMinValue().toString()); + rows.add(currentColRangeMetaData.getMaxValue().toString()); + } else if (colType instanceof DecimalType) { + rows.add(new BigDecimal(currentColRangeMetaData.getMinValue().toString())); + rows.add(new BigDecimal(currentColRangeMetaData.getMaxValue().toString())); + } else if (colType instanceof DateType) { + rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMinValue().toString())); + rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMaxValue().toString())); + } else if (colType instanceof LongType) { + rows.add(currentColRangeMetaData.getMinValue()); + rows.add(currentColRangeMetaData.getMaxValue()); + } else if (colType instanceof ShortType) { + rows.add(Short.parseShort(currentColRangeMetaData.getMinValue().toString())); + rows.add(Short.parseShort(currentColRangeMetaData.getMaxValue().toString())); + } else if (colType instanceof FloatType) { + rows.add(currentColRangeMetaData.getMinValue()); + rows.add(currentColRangeMetaData.getMaxValue()); + } else if (colType instanceof BinaryType) { + rows.add(((Binary)currentColRangeMetaData.getMinValue()).getBytes()); + rows.add(((Binary)currentColRangeMetaData.getMaxValue()).getBytes()); + } else if (colType instanceof BooleanType) { + rows.add(currentColRangeMetaData.getMinValue()); + rows.add(currentColRangeMetaData.getMaxValue()); + } else if (colType instanceof ByteType) { + rows.add(Byte.valueOf(currentColRangeMetaData.getMinValue().toString())); + rows.add(Byte.valueOf(currentColRangeMetaData.getMaxValue().toString())); + } else { + throw new HoodieException(String.format("Not support type: %s", colType)); + } + rows.add(currentColRangeMetaData.getNumNulls()); + }); + return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(rows)); + } + }).filter(f -> f != null); + List<StructField> allMetaDataSchema = new ArrayList<>(); + allMetaDataSchema.add(new StructField("file", StringType$.MODULE$, true, Metadata.empty())); + cols.forEach(col -> { + allMetaDataSchema.add(new StructField(col + "_minValue", columnsMap.get(col), true, Metadata.empty())); + allMetaDataSchema.add(new StructField(col + "_maxValue", columnsMap.get(col), true, Metadata.empty())); + allMetaDataSchema.add(new StructField(col + "_num_nulls", LongType$.MODULE$, true, Metadata.empty())); + }); + return df.sparkSession().createDataFrame(allMetaDataRDD, StructType$.MODULE$.apply(allMetaDataSchema)); + } + + public static Dataset<Row> getMinMaxValue(Dataset<Row> df, String cols) { + List<String> rawCols = Arrays.asList(cols.split(",")).stream().map(f -> f.trim()).collect(Collectors.toList()); + return getMinMaxValue(df, rawCols); + } + + /** + * Update statistics info. + * this method will update old index table by full out join, + * and save the updated table into a new index table based on commitTime. + * old index table will be cleaned also. + * + * @param df a spark DataFrame holds parquet files to be read. + * @param cols z-sort cols. + * @param indexPath index store path. + * @param commitTime current operation commitTime. + * @param validateCommits all validate commits for current table. + * @return + */ + public static void saveStatisticsInfo(Dataset<Row> df, String cols, String indexPath, String commitTime, List<String> validateCommits) { + Path savePath = new Path(indexPath, commitTime); + SparkSession spark = df.sparkSession(); + FileSystem fs = FSUtils.getFs(indexPath, spark.sparkContext().hadoopConfiguration()); + Dataset<Row> statisticsDF = OrderingIndexHelper.getMinMaxValue(df, cols); + // try to find last validate index table from index path + try { + // If there's currently no index, create one + if (!fs.exists(new Path(indexPath))) { + statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString()); + return; + } + + // Otherwise, clean up all indexes but the most recent one + + List<String> allIndexTables = Arrays + .stream(fs.listStatus(new Path(indexPath))).filter(f -> f.isDirectory()).map(f -> f.getPath().getName()).collect(Collectors.toList()); + List<String> candidateIndexTables = allIndexTables.stream().filter(f -> validateCommits.contains(f)).sorted().collect(Collectors.toList()); + List<String> residualTables = allIndexTables.stream().filter(f -> !validateCommits.contains(f)).collect(Collectors.toList()); + Option<Dataset> latestIndexData = Option.empty(); + if (!candidateIndexTables.isEmpty()) { + latestIndexData = Option.of(spark.read().load(new Path(indexPath, candidateIndexTables.get(candidateIndexTables.size() - 1)).toString())); + // clean old index table, keep at most 1 index table. + candidateIndexTables.remove(candidateIndexTables.size() - 1); + candidateIndexTables.forEach(f -> { + try { + fs.delete(new Path(indexPath, f)); + } catch (IOException ie) { + throw new HoodieException(ie); + } + }); + } + + // clean residualTables + // retried cluster operations at the same instant time is also considered, + // the residual files produced by retried are cleaned up before save statistics + // save statistics info to index table which named commitTime + residualTables.forEach(f -> { + try { + fs.delete(new Path(indexPath, f)); + } catch (IOException ie) { + throw new HoodieException(ie); + } + }); + + if (latestIndexData.isPresent() && latestIndexData.get().schema().equals(statisticsDF.schema())) { + // update the statistics info + String originalTable = "indexTable_" + java.util.UUID.randomUUID().toString().replace("-", ""); + String updateTable = "updateTable_" + java.util.UUID.randomUUID().toString().replace("-", ""); + latestIndexData.get().registerTempTable(originalTable); + statisticsDF.registerTempTable(updateTable); + // update table by full out join + List columns = Arrays.asList(statisticsDF.schema().fieldNames()); + spark.sql(ZOrderingIndexHelper.createIndexMergeSql(originalTable, updateTable, columns)).repartition(1).write().save(savePath.toString()); + } else { + statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString()); + } + } catch (IOException e) { + throw new HoodieException(e); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala index c392f12..a168e55 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala @@ -19,15 +19,16 @@ package org.apache.spark.sql.hudi.execution import org.apache.hudi.config.HoodieClusteringConfig +import org.apache.hudi.optimize.{HilbertCurveUtils, ZOrderingUtil} import org.apache.spark.rdd.{PartitionPruningRDD, RDD} -import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, BoundReference, SortOrder, UnsafeProjection, UnsafeRow} -import org.apache.hudi.optimize.ZOrderingUtil -import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, BoundReference, SortOrder, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.types._ +import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.util.MutablePair import org.apache.spark.util.random.SamplingUtils +import org.davidmoten.hilbert.HilbertCurve import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -35,10 +36,10 @@ import scala.reflect.{ClassTag, classTag} import scala.util.hashing.byteswap32 class RangeSample[K: ClassTag, V]( - zEncodeNum: Int, - rdd: RDD[_ <: Product2[K, V]], - private var ascend: Boolean = true, - val samplePointsPerPartitionHint: Int = 20) extends Serializable { + zEncodeNum: Int, + rdd: RDD[_ <: Product2[K, V]], + private var ascend: Boolean = true, + val samplePointsPerPartitionHint: Int = 20) extends Serializable { // We allow zEncodeNum = 0, which happens when sorting an empty RDD under the default settings. require(zEncodeNum >= 0, s"Number of zEncodeNum cannot be negative but found $zEncodeNum.") @@ -335,16 +336,21 @@ object RangeSampleSort { } /** - * create z-order DataFrame by sample - * first, sample origin data to get z-cols bounds, then create z-order DataFrame + * create optimize DataFrame by sample + * first, sample origin data to get order-cols bounds, then apply sort to produce DataFrame * support all type data. - * this method need more resource and cost more time than createZIndexedDataFrameByMapValue + * this method need more resource and cost more time than createOptimizedDataFrameByMapValue */ - def sortDataFrameBySample(df: DataFrame, zCols: Seq[String], fileNum: Int): DataFrame = { + def sortDataFrameBySample(df: DataFrame, zCols: Seq[String], fileNum: Int, sortMode: String): DataFrame = { val spark = df.sparkSession val columnsMap = df.schema.fields.map(item => (item.name, item)).toMap val fieldNum = df.schema.fields.length val checkCols = zCols.filter(col => columnsMap(col) != null) + val useHilbert = sortMode match { + case "hilbert" => true + case "z-order" => false + case other => throw new IllegalArgumentException(s"new only support z-order/hilbert optimize but find: ${other}") + } if (zCols.isEmpty || checkCols.isEmpty) { df @@ -366,7 +372,7 @@ object RangeSampleSort { }.filter(_._1 != -1) // Complex type found, use createZIndexedDataFrameByRange if (zFields.length != zCols.length) { - return sortDataFrameBySampleSupportAllTypes(df, zCols, fieldNum) + return sortDataFrameBySampleSupportAllTypes(df, zCols, fileNum) } val rawRdd = df.rdd @@ -441,6 +447,7 @@ object RangeSampleSort { val boundBroadCast = spark.sparkContext.broadcast(expandSampleBoundsWithFactor) val indexRdd = rawRdd.mapPartitions { iter => + val hilbertCurve = if (useHilbert) Some(HilbertCurve.bits(32).dimensions(zFields.length)) else None val expandBoundsWithFactor = boundBroadCast.value val maxBoundNum = expandBoundsWithFactor.map(_._1.length).max val longDecisionBound = new RawDecisionBound(Ordering[Long]) @@ -507,17 +514,21 @@ object RangeSampleSort { case _ => -1 } - }.filter(v => v != -1).map(ZOrderingUtil.intTo8Byte(_)).toArray - val zValues = ZOrderingUtil.interleaving(values, 8) - Row.fromSeq(row.toSeq ++ Seq(zValues)) + }.filter(v => v != -1) + val mapValues = if (hilbertCurve.isDefined) { + HilbertCurveUtils.indexBytes(hilbertCurve.get, values.map(_.toLong).toArray, 32) + } else { + ZOrderingUtil.interleaving(values.map(ZOrderingUtil.intTo8Byte(_)).toArray, 8) + } + Row.fromSeq(row.toSeq ++ Seq(mapValues)) } }.sortBy(x => ZorderingBinarySort(x.getAs[Array[Byte]](fieldNum)), numPartitions = fileNum) val newDF = df.sparkSession.createDataFrame(indexRdd, StructType( df.schema.fields ++ Seq( - StructField(s"zindex", + StructField(s"index", BinaryType, false)) )) - newDF.drop("zindex") + newDF.drop("index") } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala new file mode 100644 index 0000000..4b7864f --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hadoop.fs.Path +import org.apache.hudi.common.model.HoodieFileFormat +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.common.util.{BaseFileUtils, ParquetUtils} +import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig} +import org.apache.hudi.testutils.HoodieClientTestBase +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} +import org.apache.spark.OrderingIndexHelper +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.Arguments.arguments +import org.junit.jupiter.params.provider.{Arguments, MethodSource} + +import java.sql.{Date, Timestamp} +import scala.collection.JavaConversions._ +import scala.util.Random + +@Tag("functional") +class TestTableLayoutOptimization extends HoodieClientTestBase { + var spark: SparkSession = _ + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + "hoodie.bulkinsert.shuffle.parallelism" -> "4", + DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "timestamp", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" + ) + + @BeforeEach override def setUp() { + initPath() + initSparkContexts() + spark = sqlContext.sparkSession + initTestDataGenerator() + initFileSystem() + } + + @AfterEach override def tearDown() = { + cleanupSparkContexts() + cleanupTestDataGenerator() + cleanupFileSystem() + } + + @ParameterizedTest + @MethodSource(Array("testLayOutParameter")) + def testOptimizewithClustering(tableType: String, optimizeMode: String): Unit = { + val targetRecordsCount = 10000 + // Bulk Insert Operation + val records = recordsToStrings(dataGen.generateInserts("001", targetRecordsCount)).toList + val writeDf: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records, 2)) + + writeDf.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.compact.inline", "false") + .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType) + // option for clustering + .option("hoodie.parquet.small.file.limit", "0") + .option("hoodie.clustering.inline", "true") + .option("hoodie.clustering.inline.max.commits", "1") + .option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824") + .option("hoodie.clustering.plan.strategy.small.file.limit", "629145600") + .option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString) + .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(64 * 1024 * 1024L)) + .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true") + .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key(), optimizeMode) + .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "begin_lat, begin_lon") + .mode(SaveMode.Overwrite) + .save(basePath) + + val readDf = + spark.read + .format("hudi") + .load(basePath) + + val readDfSkip = + spark.read + .option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true") + .format("hudi") + .load(basePath) + + assertEquals(targetRecordsCount, readDf.count()) + assertEquals(targetRecordsCount, readDfSkip.count()) + + readDf.createOrReplaceTempView("hudi_snapshot_raw") + readDfSkip.createOrReplaceTempView("hudi_snapshot_skipping") + + def select(tableName: String) = + spark.sql(s"SELECT * FROM $tableName WHERE begin_lat >= 0.49 AND begin_lat < 0.51 AND begin_lon >= 0.49 AND begin_lon < 0.51") + + assertRowsMatch( + select("hudi_snapshot_raw"), + select("hudi_snapshot_skipping") + ) + } + + def assertRowsMatch(one: DataFrame, other: DataFrame) = { + val rows = one.count() + assert(rows == other.count() && one.intersect(other).count() == rows) + } + + @Test + def testCollectMinMaxStatistics(): Unit = { + val testPath = new Path(System.getProperty("java.io.tmpdir"), "minMax") + val statisticPath = new Path(System.getProperty("java.io.tmpdir"), "stat") + val fs = testPath.getFileSystem(spark.sparkContext.hadoopConfiguration) + val complexDataFrame = createComplexDataFrame(spark) + complexDataFrame.repartition(3).write.mode("overwrite").save(testPath.toString) + val df = spark.read.load(testPath.toString) + try { + // test z-order/hilbert sort for all primitive type + // shoud not throw exception. + OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, "c1,c2,c3,c5,c6,c7,c8", 20, "hilbert").show(1) + OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, "c1,c2,c3,c5,c6,c7,c8", 20, "z-order").show(1) + OrderingIndexHelper.createOptimizeDataFrameBySample(df, "c1,c2,c3,c5,c6,c7,c8", 20, "hilbert").show(1) + OrderingIndexHelper.createOptimizeDataFrameBySample(df, "c1,c2,c3,c5,c6,c7,c8", 20, "z-order").show(1) + try { + // do not support TimeStampType, so if we collect statistics for c4, should throw exception + val colDf = OrderingIndexHelper.getMinMaxValue(df, "c1,c2,c3,c5,c6,c7,c8") + colDf.cache() + assertEquals(colDf.count(), 3) + assertEquals(colDf.take(1)(0).length, 22) + colDf.unpersist() + // try to save statistics + OrderingIndexHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "2", Seq("0", "1")) + // save again + OrderingIndexHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "3", Seq("0", "1", "2")) + // test old index table clean + OrderingIndexHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "4", Seq("0", "1", "3")) + assertEquals(!fs.exists(new Path(statisticPath, "2")), true) + assertEquals(fs.exists(new Path(statisticPath, "3")), true) + // test to save different index, new index on ("c1,c6,c7,c8") should be successfully saved. + OrderingIndexHelper.saveStatisticsInfo(df, "c1,c6,c7,c8", statisticPath.toString, "5", Seq("0", "1", "3", "4")) + assertEquals(fs.exists(new Path(statisticPath, "5")), true) + } finally { + if (fs.exists(testPath)) fs.delete(testPath) + if (fs.exists(statisticPath)) fs.delete(statisticPath) + } + } + } + + // test collect min-max statistic info for DateType in the case of multithreading. + // parquet will give a wrong statistic result for DateType in the case of multithreading. + @Test + def testMultiThreadParquetFooterReadForDateType(): Unit = { + // create parquet file with DateType + val rdd = spark.sparkContext.parallelize(0 to 100, 1) + .map(item => RowFactory.create(Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}"))) + val df = spark.createDataFrame(rdd, new StructType().add("id", DateType)) + val testPath = new Path(System.getProperty("java.io.tmpdir"), "testCollectDateType") + val conf = spark.sparkContext.hadoopConfiguration + val cols = new java.util.ArrayList[String] + cols.add("id") + try { + df.repartition(3).write.mode("overwrite").save(testPath.toString) + val inputFiles = spark.read.load(testPath.toString).inputFiles.sortBy(x => x) + + val realResult = new Array[(String, String)](3) + inputFiles.zipWithIndex.foreach { case (f, index) => + val fileUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils] + val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f), cols).iterator().next() + realResult(index) = (res.getMinValue.toString, res.getMaxValue.toString) + } + + // multi thread read with no lock + val resUseLock = new Array[(String, String)](3) + inputFiles.zipWithIndex.par.foreach { case (f, index) => + val fileUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils] + val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f), cols).iterator().next() + resUseLock(index) = (res.getMinValue.toString, res.getMaxValue.toString) + } + + // check resUseNoLock, + // We can't guarantee that there must be problems in the case of multithreading. + // In order to make ut pass smoothly, we will not check resUseNoLock. + // check resUseLock + // should pass assert + realResult.zip(resUseLock).foreach { case (realValue, testValue) => + assert(realValue == testValue, s" expect realValue: ${realValue} but find ${testValue}") + } + } finally { + if (fs.exists(testPath)) fs.delete(testPath) + } + } + + def createComplexDataFrame(spark: SparkSession): DataFrame = { + val schema = new StructType() + .add("c1", IntegerType) + .add("c2", StringType) + .add("c3", DecimalType(9,3)) + .add("c4", TimestampType) + .add("c5", ShortType) + .add("c6", DateType) + .add("c7", BinaryType) + .add("c8", ByteType) + + val rdd = spark.sparkContext.parallelize(0 to 1000, 1).map { item => + val c1 = Integer.valueOf(item) + val c2 = s" ${item}sdc" + val c3 = new java.math.BigDecimal(s"${Random.nextInt(1000)}.${item}") + val c4 = new Timestamp(System.currentTimeMillis()) + val c5 = java.lang.Short.valueOf(s"${(item + 16) /10}") + val c6 = Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}") + val c7 = Array(item).map(_.toByte) + val c8 = java.lang.Byte.valueOf("9") + + RowFactory.create(c1, c2, c3, c4, c5, c6, c7, c8) + } + spark.createDataFrame(rdd, schema) + } +} + +object TestTableLayoutOptimization { + def testLayOutParameter(): java.util.stream.Stream[Arguments] = { + java.util.stream.Stream.of( + arguments("COPY_ON_WRITE", "hilbert"), + arguments("COPY_ON_WRITE", "z-order"), + arguments("MERGE_ON_READ", "hilbert"), + arguments("MERGE_ON_READ", "z-order") + ) + } +} + diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala new file mode 100644 index 0000000..c8263b3 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.hadoop.fs.Path +import org.apache.spark.OrderingIndexHelper +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.hudi.TestHoodieSqlBase + +import scala.util.Random + +object SpaceCurveOptimizeBenchMark extends TestHoodieSqlBase { + + def getSkippingPercent(tableName: String, co1: String, co2: String, value1: Int, value2: Int): Unit= { + val minMax = OrderingIndexHelper + .getMinMaxValue(spark.sql(s"select * from ${tableName}"), s"${co1}, ${co2}") + .collect().map(f => (f.getInt(1), f.getInt(2), f.getInt(4), f.getInt(5))) + var c = 0 + for (elem <- minMax) { + if ((elem._1 <= value1 && elem._2 >= value1) || (elem._3 <= value2 && elem._4 >= value2)) { + c = c + 1 + } + } + + val p = c / minMax.size.toDouble + println(s"for table ${tableName} with query filter: ${co1} = ${value1} or ${co2} = ${value2} we can achieve skipping percent ${1.0 - p}") + } + + /* + for table table_z_sort_byMap with query filter: c1_int = 500000 or c2_int = 500000 we can achieve skipping percent 0.8 + for table table_z_sort_bySample with query filter: c1_int = 500000 or c2_int = 500000 we can achieve skipping percent 0.77 + for table table_hilbert_sort_byMap with query filter: c1_int = 500000 or c2_int = 500000 we can achieve skipping percent 0.855 + for table table_hilbert_sort_bySample with query filter: c1_int = 500000 or c2_int = 500000 we can achieve skipping percent 0.83 + */ + def runNormalTableSkippingBenchMark(): Unit = { + withTempDir { f => + withTempTable("table_z_sort_byMap", "table_z_sort_bySample", "table_hilbert_sort_byMap", "table_hilbert_sort_bySample") { + prepareInterTypeTable(new Path(f.getAbsolutePath), 1000000) + // choose median value as filter condition. + // the median value of c1_int is 500000 + // the median value of c2_int is 500000 + getSkippingPercent("table_z_sort_byMap", "c1_int", "c2_int", 500000, 500000) + getSkippingPercent("table_z_sort_bySample", "c1_int", "c2_int", 500000, 500000) + getSkippingPercent("table_hilbert_sort_byMap", "c1_int", "c2_int", 500000, 500000) + getSkippingPercent("table_hilbert_sort_bySample", "c1_int", "c2_int", 500000, 500000) + } + } + } + + /* + for table table_z_sort_byMap_skew with query filter: c1_int = 5000 or c2_int = 500000 we can achieve skipping percent 0.0 + for table table_z_sort_bySample_skew with query filter: c1_int = 5000 or c2_int = 500000 we can achieve skipping percent 0.78 + for table table_hilbert_sort_byMap_skew with query filter: c1_int = 5000 or c2_int = 500000 we can achieve skipping percent 0.05500000000000005 + for table table_hilbert_sort_bySample_skew with query filter: c1_int = 5000 or c2_int = 500000 we can achieve skipping percent 0.84 + */ + def runSkewTableSkippingBenchMark(): Unit = { + withTempDir { f => + withTempTable("table_z_sort_byMap_skew", "table_z_sort_bySample_skew", "table_hilbert_sort_byMap_skew", "table_hilbert_sort_bySample_skew") { + // prepare skewed table. + prepareInterTypeTable(new Path(f.getAbsolutePath), 1000000, 10000, 1000000, true) + // choose median value as filter condition. + // the median value of c1_int is 5000 + // the median value of c2_int is 500000 + getSkippingPercent("table_z_sort_byMap_skew", "c1_int", "c2_int", 5000, 500000) + getSkippingPercent("table_z_sort_bySample_skew", "c1_int", "c2_int", 5000, 500000) + getSkippingPercent("table_hilbert_sort_byMap_skew", "c1_int", "c2_int", 5000, 500000) + getSkippingPercent("table_hilbert_sort_bySample_skew", "c1_int", "c2_int", 5000, 500000) + } + } + } + + def main(args: Array[String]): Unit = { + runNormalTableSkippingBenchMark() + runSkewTableSkippingBenchMark() + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { + try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def prepareInterTypeTable(tablePath: Path, numRows: Int, col1Range: Int = 1000000, col2Range: Int = 1000000, skewed: Boolean = false): Unit = { + import spark.implicits._ + val df = spark.range(numRows).map(_ => (Random.nextInt(col1Range), Random.nextInt(col2Range))).toDF("c1_int", "c2_int") + val dfOptimizeByMap = OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, "c1_int, c2_int", 200, "z-order") + val dfOptimizeBySample = OrderingIndexHelper.createOptimizeDataFrameBySample(df, "c1_int, c2_int", 200, "z-order") + + val dfHilbertOptimizeByMap = OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, "c1_int, c2_int", 200, "hilbert") + val dfHilbertOptimizeBySample = OrderingIndexHelper.createOptimizeDataFrameBySample(df, "c1_int, c2_int", 200, "hilbert") + + saveAsTable(dfOptimizeByMap, tablePath, if (skewed) "z_sort_byMap_skew" else "z_sort_byMap") + saveAsTable(dfOptimizeBySample, tablePath, if (skewed) "z_sort_bySample_skew" else "z_sort_bySample") + saveAsTable(dfHilbertOptimizeByMap, tablePath, if (skewed) "hilbert_sort_byMap_skew" else "hilbert_sort_byMap") + saveAsTable(dfHilbertOptimizeBySample, tablePath, if (skewed) "hilbert_sort_bySample_skew" else "hilbert_sort_bySample") + } + + def saveAsTable(df: DataFrame, savePath: Path, suffix: String): Unit = { + + df.write.mode("overwrite").save(new Path(savePath, suffix).toString) + spark.read.parquet(new Path(savePath, suffix).toString).createOrReplaceTempView("table_" + suffix) + } +} + diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml index 5f45fdd..1766a98 100644 --- a/packaging/hudi-flink-bundle/pom.xml +++ b/packaging/hudi-flink-bundle/pom.xml @@ -107,6 +107,8 @@ <include>com.fasterxml.jackson.core:jackson-databind</include> <include>com.fasterxml.jackson.core:jackson-core</include> + <include>com.github.davidmoten:guava-mini</include> + <include>com.github.davidmoten:hilbert-curve</include> <include>com.twitter:bijection-avro_${scala.binary.version}</include> <include>com.twitter:bijection-core_${scala.binary.version}</include> <include>io.dropwizard.metrics:metrics-core</include> diff --git a/packaging/hudi-kafka-connect-bundle/pom.xml b/packaging/hudi-kafka-connect-bundle/pom.xml index cd7b151..b8a4bcd 100644 --- a/packaging/hudi-kafka-connect-bundle/pom.xml +++ b/packaging/hudi-kafka-connect-bundle/pom.xml @@ -89,6 +89,8 @@ <include>org.apache.flink:flink-core</include> <include>org.apache.flink:flink-hadoop-compatibility_${scala.binary.version}</include> + <include>com.github.davidmoten:guava-mini</include> + <include>com.github.davidmoten:hilbert-curve</include> <include>com.yammer.metrics:metrics-core</include> <include>com.beust:jcommander</include> <include>io.javalin:javalin</include> diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml index 32a9abf..02d11be 100644 --- a/packaging/hudi-spark-bundle/pom.xml +++ b/packaging/hudi-spark-bundle/pom.xml @@ -88,6 +88,8 @@ <include>org.antlr:stringtemplate</include> <include>org.apache.parquet:parquet-avro</include> + <include>com.github.davidmoten:guava-mini</include> + <include>com.github.davidmoten:hilbert-curve</include> <include>com.twitter:bijection-avro_${scala.binary.version}</include> <include>com.twitter:bijection-core_${scala.binary.version}</include> <include>io.dropwizard.metrics:metrics-core</include> diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml index 568e9d5..798c27f 100644 --- a/packaging/hudi-utilities-bundle/pom.xml +++ b/packaging/hudi-utilities-bundle/pom.xml @@ -117,6 +117,8 @@ <include>com.amazonaws:aws-java-sdk-dynamodb</include> <include>com.amazonaws:aws-java-sdk-core</include> + <include>com.github.davidmoten:guava-mini</include> + <include>com.github.davidmoten:hilbert-curve</include> <include>com.twitter:bijection-avro_${scala.binary.version}</include> <include>com.twitter:bijection-core_${scala.binary.version}</include> <include>io.confluent:kafka-avro-serializer</include>