This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 780a2ac  [HUDI-2102] Support hilbert curve for hudi (#3952)
780a2ac is described below

commit 780a2ac5b208b06f53ee64186b5392ebc37eadbc
Author: xiarixiaoyao <mengtao0...@qq.com>
AuthorDate: Sat Nov 27 15:20:19 2021 +0800

    [HUDI-2102] Support hilbert curve for hudi (#3952)
    
    Co-authored-by: Y Ethan Guo <ethan.guoyi...@gmail.com>
---
 NOTICE                                             |   6 +
 hudi-client/hudi-client-common/pom.xml             |   7 +
 .../apache/hudi/config/HoodieClusteringConfig.java |  28 ++
 .../apache/hudi/optimize/HilbertCurveUtils.java    |  52 +++
 .../org/apache/hudi/optimize/ZOrderingUtil.java    |   7 +-
 .../hudi/optimize/TestHilbertCurveUtils.java       |  38 ++
 .../apache/hudi/optimize/TestZOrderingUtil.java    |  25 ++
 ...RDDSpatialCurveOptimizationSortPartitioner.java |   8 +-
 .../hudi/index/zorder/ZOrderingIndexHelper.java    |  17 +-
 .../hudi/table/HoodieSparkCopyOnWriteTable.java    |   8 +-
 .../java/org/apache/spark/OrderingIndexHelper.java | 430 +++++++++++++++++++++
 .../spark/sql/hudi/execution/RangeSample.scala     |  45 ++-
 .../functional/TestTableLayoutOptimization.scala   | 248 ++++++++++++
 .../benchmark/SpaceCurveOptimizeBenchMark.scala    | 118 ++++++
 packaging/hudi-flink-bundle/pom.xml                |   2 +
 packaging/hudi-kafka-connect-bundle/pom.xml        |   2 +
 packaging/hudi-spark-bundle/pom.xml                |   2 +
 packaging/hudi-utilities-bundle/pom.xml            |   2 +
 18 files changed, 1015 insertions(+), 30 deletions(-)

diff --git a/NOTICE b/NOTICE
index 9b24933..437b974 100644
--- a/NOTICE
+++ b/NOTICE
@@ -159,3 +159,9 @@ its NOTICE file:
   This product includes software developed at
   StreamSets (http://www.streamsets.com/).
 
+--------------------------------------------------------------------------------
+
+This product includes code from hilbert-curve project
+ * Copyright https://github.com/davidmoten/hilbert-curve
+ * Licensed under the Apache-2.0 License
+
diff --git a/hudi-client/hudi-client-common/pom.xml 
b/hudi-client/hudi-client-common/pom.xml
index d30ee29..22ad8ec 100644
--- a/hudi-client/hudi-client-common/pom.xml
+++ b/hudi-client/hudi-client-common/pom.xml
@@ -64,6 +64,13 @@
       <artifactId>parquet-avro</artifactId>
     </dependency>
 
+    <!-- Hilbert Curve -->
+    <dependency>
+      <groupId>com.github.davidmoten</groupId>
+      <artifactId>hilbert-curve</artifactId>
+      <version>0.2.2</version>
+    </dependency>
+
     <!-- Dropwizard Metrics -->
     <dependency>
       <groupId>io.dropwizard.metrics</groupId>
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index 9a10965..676f2ff 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -542,4 +542,32 @@ public class HoodieClusteringConfig extends HoodieConfig {
       }
     }
   }
+
+  /**
+   * strategy types for optimize layout for hudi data.
+   */
+  public enum BuildLayoutOptimizationStrategy {
+    ZORDER("z-order"),
+    HILBERT("hilbert");
+    private final String value;
+
+    BuildLayoutOptimizationStrategy(String value) {
+      this.value = value;
+    }
+
+    public String toCustomString() {
+      return value;
+    }
+
+    public static BuildLayoutOptimizationStrategy fromValue(String value) {
+      switch (value.toLowerCase(Locale.ROOT)) {
+        case "z-order":
+          return ZORDER;
+        case "hilbert":
+          return HILBERT;
+        default:
+          throw new HoodieException("Invalid value of Type.");
+      }
+    }
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java
new file mode 100644
index 0000000..0f216ab
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.optimize;
+
+import org.davidmoten.hilbert.HilbertCurve;
+
+import java.math.BigInteger;
+
+/**
+ * Utils for Hilbert Curve.
+ */
+public class HilbertCurveUtils {
+  public static byte[] indexBytes(HilbertCurve hilbertCurve, long[] points, 
int paddingNum) {
+    BigInteger index = hilbertCurve.index(points);
+    return paddingToNByte(index.toByteArray(), paddingNum);
+  }
+
+  public static byte[] paddingToNByte(byte[] a, int paddingNum) {
+    if (a.length == paddingNum) {
+      return a;
+    }
+    if (a.length > paddingNum) {
+      byte[] result = new byte[paddingNum];
+      System.arraycopy(a, 0, result, 0, paddingNum);
+      return result;
+    }
+    int paddingSize = paddingNum - a.length;
+    byte[] result = new byte[paddingNum];
+    for (int i = 0; i < paddingSize; i++) {
+      result[i] = 0;
+    }
+    System.arraycopy(a, 0, result, paddingSize, a.length);
+    return result;
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java
index 3aa8080..50827cc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java
@@ -176,9 +176,14 @@ public class ZOrderingUtil {
 
   public static Long convertStringToLong(String a) {
     byte[] bytes = utf8To8Byte(a);
+    return convertBytesToLong(bytes);
+  }
+
+  public static long convertBytesToLong(byte[] bytes) {
+    byte[] paddedBytes = paddingTo8Byte(bytes);
     long temp = 0L;
     for (int i = 7; i >= 0; i--) {
-      temp = temp | (((long)bytes[i] & 0xff) << (7 - i) * 8);
+      temp = temp | (((long) paddedBytes[i] & 0xff) << (7 - i) * 8);
     }
     return temp;
   }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestHilbertCurveUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestHilbertCurveUtils.java
new file mode 100644
index 0000000..5bb482e
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestHilbertCurveUtils.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.optimize;
+
+import org.davidmoten.hilbert.HilbertCurve;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHilbertCurveUtils {
+
+  private static final HilbertCurve INSTANCE = 
HilbertCurve.bits(5).dimensions(2);
+
+  @Test
+  public void testIndex() {
+    long[] t = {1, 2};
+    assertEquals(13, INSTANCE.index(t).intValue());
+    long[] t1 = {0, 16};
+    assertEquals(256, INSTANCE.index(t1).intValue());
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java
index 7dab6c2..a22485f 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java
@@ -126,4 +126,29 @@ public class TestZOrderingUtil {
       this.originValue = originValue;
     }
   }
+
+  @Test
+  public void testConvertBytesToLong() {
+    long[] tests = new long[] {Long.MIN_VALUE, -1L, 0, 1L, Long.MAX_VALUE};
+    for (int i = 0; i < tests.length; i++) {
+      
assertEquals(ZOrderingUtil.convertBytesToLong(convertLongToBytes(tests[i])), 
tests[i]);
+    }
+  }
+
+  @Test
+  public void testConvertBytesToLongWithPadding() {
+    byte[] bytes = new byte[2];
+    bytes[0] = 2;
+    bytes[1] = 127;
+    assertEquals(ZOrderingUtil.convertBytesToLong(bytes), 2 * 256 + 127);
+  }
+
+  private byte[] convertLongToBytes(long num) {
+    byte[] byteNum = new byte[8];
+    for (int i = 0; i < 8; i++) {
+      int offset = 64 - (i + 1) * 8;
+      byteNum[i] = (byte) ((num >> offset) & 0xff);
+    }
+    return byteNum;
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java
index 03fdf5a..51526fc 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java
@@ -33,7 +33,7 @@ import org.apache.hudi.table.BulkInsertPartitioner;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.index.zorder.ZOrderingIndexHelper;
+import org.apache.spark.OrderingIndexHelper;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -79,10 +79,12 @@ public class RDDSpatialCurveOptimizationSortPartitioner<T 
extends HoodieRecordPa
 
     switch (config.getLayoutOptimizationCurveBuildMethod()) {
       case DIRECT:
-        zDataFrame = 
ZOrderingIndexHelper.createZIndexedDataFrameByMapValue(originDF, 
config.getClusteringSortColumns(), numOutputGroups);
+        zDataFrame = OrderingIndexHelper
+            .createOptimizedDataFrameByMapValue(originDF, 
config.getClusteringSortColumns(), numOutputGroups, 
config.getLayoutOptimizationStrategy());
         break;
       case SAMPLE:
-        zDataFrame = 
ZOrderingIndexHelper.createZIndexedDataFrameBySample(originDF, 
config.getClusteringSortColumns(), numOutputGroups);
+        zDataFrame = OrderingIndexHelper
+            .createOptimizeDataFrameBySample(originDF, 
config.getClusteringSortColumns(), numOutputGroups, 
config.getLayoutOptimizationStrategy());
         break;
       default:
         throw new HoodieException("Not a valid build curve method for 
doWriteOperation: ");
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java
index 248c15c..934d1b9 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java
@@ -18,17 +18,19 @@
 
 package org.apache.hudi.index.zorder;
 
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.util.BaseFileUtils;
 import org.apache.hudi.common.util.ParquetUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.optimize.ZOrderingUtil;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.parquet.io.api.Binary;
@@ -62,10 +64,10 @@ import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.types.StructType$;
 import org.apache.spark.sql.types.TimestampType;
 import org.apache.spark.util.SerializableConfiguration;
-import scala.collection.JavaConversions;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.ArrayList;
@@ -77,6 +79,8 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+import scala.collection.JavaConversions;
+
 import static org.apache.hudi.util.DataTypeUtils.areCompatible;
 
 public class ZOrderingIndexHelper {
@@ -189,7 +193,8 @@ public class ZOrderingIndexHelper {
   }
 
   public static Dataset<Row> createZIndexedDataFrameBySample(Dataset<Row> df, 
List<String> zCols, int fileNum) {
-    return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, 
JavaConversions.asScalaBuffer(zCols), fileNum);
+    return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, 
JavaConversions.asScalaBuffer(zCols), fileNum,
+        
HoodieClusteringConfig.BuildLayoutOptimizationStrategy.ZORDER.toCustomString());
   }
 
   public static Dataset<Row> createZIndexedDataFrameBySample(Dataset<Row> df, 
String zCols, int fileNum) {
@@ -584,7 +589,7 @@ public class ZOrderingIndexHelper {
    * @VisibleForTesting
    */
   @Nonnull
-  static String createIndexMergeSql(
+  public static String createIndexMergeSql(
       @Nonnull String originalIndexTable,
       @Nonnull String newIndexTable,
       @Nonnull List<String> columns
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 280d24f..7d2fbd3 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -18,8 +18,6 @@
 
 package org.apache.hudi.table;
 
-import org.apache.avro.Schema;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
@@ -49,6 +47,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.index.zorder.ZOrderingIndexHelper;
 import org.apache.hudi.io.HoodieCreateHandle;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.HoodieSortedMergeHandle;
@@ -76,12 +75,15 @@ import 
org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
 import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
 import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
 import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.hudi.index.zorder.ZOrderingIndexHelper;
 import org.apache.spark.api.java.JavaRDD;
 
 import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java
new file mode 100644
index 0000000..67b1c67
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.index.zorder.ZOrderingIndexHelper;
+import org.apache.hudi.optimize.HilbertCurveUtils;
+import org.apache.hudi.optimize.ZOrderingUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.io.api.Binary;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.Row$;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.hudi.execution.RangeSampleSort$;
+import org.apache.spark.sql.hudi.execution.ZorderingBinarySort;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.BinaryType$;
+import org.apache.spark.sql.types.BooleanType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.FloatType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.LongType$;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StringType$;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType$;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.util.SerializableConfiguration;
+import org.davidmoten.hilbert.HilbertCurve;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConversions;
+
+public class OrderingIndexHelper {
+
+  private static final String SPARK_JOB_DESCRIPTION = "spark.job.description";
+
+  /**
+   * Create optimized DataFrame directly
+   * only support base type data. 
long,int,short,double,float,string,timestamp,decimal,date,byte
+   * this method is more effective than createOptimizeDataFrameBySample
+   *
+   * @param df       a spark DataFrame holds parquet files to be read.
+   * @param sortCols ordering columns for the curve
+   * @param fileNum  spark partition num
+   * @param sortMode layout optimization strategy
+   * @return a dataFrame ordered by the curve.
+   */
+  public static Dataset<Row> createOptimizedDataFrameByMapValue(Dataset<Row> 
df, List<String> sortCols, int fileNum, String sortMode) {
+    Map<String, StructField> columnsMap = 
Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e 
-> e));
+    int fieldNum = df.schema().fields().length;
+    List<String> checkCols = sortCols.stream().filter(f -> 
columnsMap.containsKey(f)).collect(Collectors.toList());
+    if (sortCols.size() != checkCols.size()) {
+      return df;
+    }
+    // only one col to sort, no need to use z-order
+    if (sortCols.size() == 1) {
+      return df.repartitionByRange(fieldNum, 
org.apache.spark.sql.functions.col(sortCols.get(0)));
+    }
+    Map<Integer, StructField> fieldMap = sortCols
+        .stream().collect(Collectors.toMap(e -> 
Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), e -> 
columnsMap.get(e)));
+    // do optimize
+    JavaRDD<Row> sortedRDD = null;
+    switch 
(HoodieClusteringConfig.BuildLayoutOptimizationStrategy.fromValue(sortMode)) {
+      case ZORDER:
+        sortedRDD = createZCurveSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, 
fileNum);
+        break;
+      case HILBERT:
+        sortedRDD = createHilbertSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, 
fileNum);
+        break;
+      default:
+        throw new IllegalArgumentException(String.format("new only support 
z-order/hilbert optimize but find: %s", sortMode));
+    }
+    // create new StructType
+    List<StructField> newFields = new ArrayList<>();
+    newFields.addAll(Arrays.asList(df.schema().fields()));
+    newFields.add(new StructField("Index", BinaryType$.MODULE$, true, 
Metadata.empty()));
+
+    // create new DataFrame
+    return df.sparkSession().createDataFrame(sortedRDD, 
StructType$.MODULE$.apply(newFields)).drop("Index");
+  }
+
+  private static JavaRDD<Row> createZCurveSortedRDD(JavaRDD<Row> originRDD, 
Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
+    return originRDD.map(row -> {
+      List<byte[]> zBytesList = fieldMap.entrySet().stream().map(entry -> {
+        int index = entry.getKey();
+        StructField field = entry.getValue();
+        DataType dataType = field.dataType();
+        if (dataType instanceof LongType) {
+          return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? 
Long.MAX_VALUE : row.getLong(index));
+        } else if (dataType instanceof DoubleType) {
+          return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? 
Double.MAX_VALUE : row.getDouble(index));
+        } else if (dataType instanceof IntegerType) {
+          return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? 
Integer.MAX_VALUE : row.getInt(index));
+        } else if (dataType instanceof FloatType) {
+          return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? 
Float.MAX_VALUE : row.getFloat(index));
+        } else if (dataType instanceof StringType) {
+          return ZOrderingUtil.utf8To8Byte(row.isNullAt(index) ? "" : 
row.getString(index));
+        } else if (dataType instanceof DateType) {
+          return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? 
Long.MAX_VALUE : row.getDate(index).getTime());
+        } else if (dataType instanceof TimestampType) {
+          return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? 
Long.MAX_VALUE : row.getTimestamp(index).getTime());
+        } else if (dataType instanceof ByteType) {
+          return ZOrderingUtil.byteTo8Byte(row.isNullAt(index) ? 
Byte.MAX_VALUE : row.getByte(index));
+        } else if (dataType instanceof ShortType) {
+          return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? 
Short.MAX_VALUE : row.getShort(index));
+        } else if (dataType instanceof DecimalType) {
+          return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? 
Long.MAX_VALUE : row.getDecimal(index).longValue());
+        } else if (dataType instanceof BooleanType) {
+          boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
+          return ZOrderingUtil.intTo8Byte(value ? 1 : 0);
+        } else if (dataType instanceof BinaryType) {
+          return ZOrderingUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] 
{0} : (byte[]) row.get(index));
+        }
+        return null;
+      }).filter(f -> f != null).collect(Collectors.toList());
+      byte[][] zBytes = new byte[zBytesList.size()][];
+      for (int i = 0; i < zBytesList.size(); i++) {
+        zBytes[i] = zBytesList.get(i);
+      }
+      List<Object> zVaules = new ArrayList<>();
+      
zVaules.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
+      zVaules.add(ZOrderingUtil.interleaving(zBytes, 8));
+      return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(zVaules));
+    }).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true, 
fileNum);
+  }
+
+  private static JavaRDD<Row> createHilbertSortedRDD(JavaRDD<Row> originRDD, 
Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
+    return originRDD.mapPartitions(rows -> {
+      HilbertCurve hilbertCurve = 
HilbertCurve.bits(63).dimensions(fieldMap.size());
+      return new Iterator<Row>() {
+
+        @Override
+        public boolean hasNext() {
+          return rows.hasNext();
+        }
+
+        @Override
+        public Row next() {
+          Row row = rows.next();
+          List<Long> longList = fieldMap.entrySet().stream().map(entry -> {
+            int index = entry.getKey();
+            StructField field = entry.getValue();
+            DataType dataType = field.dataType();
+            if (dataType instanceof LongType) {
+              return row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index);
+            } else if (dataType instanceof DoubleType) {
+              return row.isNullAt(index) ? Long.MAX_VALUE : 
Double.doubleToLongBits(row.getDouble(index));
+            } else if (dataType instanceof IntegerType) {
+              return row.isNullAt(index) ? Long.MAX_VALUE : 
(long)row.getInt(index);
+            } else if (dataType instanceof FloatType) {
+              return row.isNullAt(index) ? Long.MAX_VALUE : 
Double.doubleToLongBits((double) row.getFloat(index));
+            } else if (dataType instanceof StringType) {
+              return row.isNullAt(index) ? Long.MAX_VALUE : 
ZOrderingUtil.convertStringToLong(row.getString(index));
+            } else if (dataType instanceof DateType) {
+              return row.isNullAt(index) ? Long.MAX_VALUE : 
row.getDate(index).getTime();
+            } else if (dataType instanceof TimestampType) {
+              return row.isNullAt(index) ? Long.MAX_VALUE : 
row.getTimestamp(index).getTime();
+            } else if (dataType instanceof ByteType) {
+              return row.isNullAt(index) ? Long.MAX_VALUE : 
ZOrderingUtil.convertBytesToLong(new byte[] {row.getByte(index)});
+            } else if (dataType instanceof ShortType) {
+              return row.isNullAt(index) ? Long.MAX_VALUE : 
(long)row.getShort(index);
+            } else if (dataType instanceof DecimalType) {
+              return row.isNullAt(index) ? Long.MAX_VALUE : 
row.getDecimal(index).longValue();
+            } else if (dataType instanceof BooleanType) {
+              boolean value = row.isNullAt(index) ? false : 
row.getBoolean(index);
+              return value ? Long.MAX_VALUE : 0;
+            } else if (dataType instanceof BinaryType) {
+              return row.isNullAt(index) ? Long.MAX_VALUE : 
ZOrderingUtil.convertBytesToLong((byte[]) row.get(index));
+            }
+            return null;
+          }).filter(f -> f != null).collect(Collectors.toList());
+
+          byte[] hilbertValue = HilbertCurveUtils.indexBytes(
+              hilbertCurve, longList.stream().mapToLong(l -> l).toArray(), 63);
+          List<Object> values = new ArrayList<>();
+          
values.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
+          values.add(hilbertValue);
+          return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(values));
+        }
+      };
+    }).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true, 
fileNum);
+  }
+
+  public static Dataset<Row> createOptimizedDataFrameByMapValue(Dataset<Row> 
df, String sortCols, int fileNum, String sortMode) {
+    if (sortCols == null || sortCols.isEmpty() || fileNum <= 0) {
+      return df;
+    }
+    return createOptimizedDataFrameByMapValue(df,
+        Arrays.stream(sortCols.split(",")).map(f -> 
f.trim()).collect(Collectors.toList()), fileNum, sortMode);
+  }
+
+  public static Dataset<Row> createOptimizeDataFrameBySample(Dataset<Row> df, 
List<String> zCols, int fileNum, String sortMode) {
+    return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, 
JavaConversions.asScalaBuffer(zCols), fileNum, sortMode);
+  }
+
+  public static Dataset<Row> createOptimizeDataFrameBySample(Dataset<Row> df, 
String zCols, int fileNum, String sortMode) {
+    if (zCols == null || zCols.isEmpty() || fileNum <= 0) {
+      return df;
+    }
+    return createOptimizeDataFrameBySample(df, 
Arrays.stream(zCols.split(",")).map(f -> 
f.trim()).collect(Collectors.toList()), fileNum, sortMode);
+  }
+
+  /**
+   * Parse min/max statistics stored in parquet footers for z-sort cols.
+   * no support collect statistics from timeStampType, since parquet file has 
not collect the statistics for timeStampType.
+   * to do adapt for rfc-27
+   *
+   * @param df a spark DataFrame holds parquet files to be read.
+   * @param cols z-sort cols
+   * @return a dataFrame holds all statistics info.
+   */
+  public static Dataset<Row> getMinMaxValue(Dataset<Row> df, List<String> 
cols) {
+    Map<String, DataType> columnsMap = 
Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e 
-> e.dataType()));
+
+    List<String> scanFiles = Arrays.asList(df.inputFiles());
+    SparkContext sc = df.sparkSession().sparkContext();
+    JavaSparkContext jsc = new JavaSparkContext(sc);
+
+    SerializableConfiguration serializableConfiguration = new 
SerializableConfiguration(sc.hadoopConfiguration());
+    int numParallelism = (scanFiles.size() / 3 + 1);
+    List<HoodieColumnRangeMetadata<Comparable>> colMinMaxInfos;
+    String previousJobDescription = sc.getLocalProperty(SPARK_JOB_DESCRIPTION);
+    try {
+      jsc.setJobDescription("Listing parquet column statistics");
+      colMinMaxInfos = jsc.parallelize(scanFiles, 
numParallelism).mapPartitions(paths -> {
+        Configuration conf = serializableConfiguration.value();
+        ParquetUtils parquetUtils = (ParquetUtils) 
BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
+        List<Collection<HoodieColumnRangeMetadata<Comparable>>> results = new 
ArrayList<>();
+        while (paths.hasNext()) {
+          String path = paths.next();
+          results.add(parquetUtils.readRangeFromParquetMetadata(conf, new 
Path(path), cols));
+        }
+        return results.stream().flatMap(f -> f.stream()).iterator();
+      }).collect();
+    } finally {
+      jsc.setJobDescription(previousJobDescription);
+    }
+
+    Map<String, List<HoodieColumnRangeMetadata<Comparable>>> 
fileToStatsListMap = colMinMaxInfos.stream().collect(Collectors.groupingBy(e -> 
e.getFilePath()));
+    JavaRDD<Row> allMetaDataRDD = jsc.parallelize(new 
ArrayList<>(fileToStatsListMap.values()), 1).map(f -> {
+      int colSize = f.size();
+      if (colSize == 0) {
+        return null;
+      } else {
+        List<Object> rows = new ArrayList<>();
+        rows.add(f.get(0).getFilePath());
+        cols.stream().forEach(col -> {
+          HoodieColumnRangeMetadata<Comparable> currentColRangeMetaData =
+              f.stream().filter(s -> 
s.getColumnName().trim().equalsIgnoreCase(col)).findFirst().orElse(null);
+          DataType colType = columnsMap.get(col);
+          if (currentColRangeMetaData == null || colType == null) {
+            throw new HoodieException(String.format("cannot collect min/max 
statistics for col: %s", col));
+          }
+          if (colType instanceof IntegerType) {
+            rows.add(currentColRangeMetaData.getMinValue());
+            rows.add(currentColRangeMetaData.getMaxValue());
+          } else if (colType instanceof DoubleType) {
+            rows.add(currentColRangeMetaData.getMinValue());
+            rows.add(currentColRangeMetaData.getMaxValue());
+          } else if (colType instanceof StringType) {
+            rows.add(currentColRangeMetaData.getMinValue().toString());
+            rows.add(currentColRangeMetaData.getMaxValue().toString());
+          } else if (colType instanceof DecimalType) {
+            rows.add(new 
BigDecimal(currentColRangeMetaData.getMinValue().toString()));
+            rows.add(new 
BigDecimal(currentColRangeMetaData.getMaxValue().toString()));
+          } else if (colType instanceof DateType) {
+            
rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMinValue().toString()));
+            
rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMaxValue().toString()));
+          } else if (colType instanceof LongType) {
+            rows.add(currentColRangeMetaData.getMinValue());
+            rows.add(currentColRangeMetaData.getMaxValue());
+          } else if (colType instanceof ShortType) {
+            
rows.add(Short.parseShort(currentColRangeMetaData.getMinValue().toString()));
+            
rows.add(Short.parseShort(currentColRangeMetaData.getMaxValue().toString()));
+          } else if (colType instanceof FloatType) {
+            rows.add(currentColRangeMetaData.getMinValue());
+            rows.add(currentColRangeMetaData.getMaxValue());
+          } else if (colType instanceof BinaryType) {
+            
rows.add(((Binary)currentColRangeMetaData.getMinValue()).getBytes());
+            
rows.add(((Binary)currentColRangeMetaData.getMaxValue()).getBytes());
+          } else if (colType instanceof BooleanType) {
+            rows.add(currentColRangeMetaData.getMinValue());
+            rows.add(currentColRangeMetaData.getMaxValue());
+          } else if (colType instanceof ByteType) {
+            
rows.add(Byte.valueOf(currentColRangeMetaData.getMinValue().toString()));
+            
rows.add(Byte.valueOf(currentColRangeMetaData.getMaxValue().toString()));
+          }  else {
+            throw new HoodieException(String.format("Not support type:  %s", 
colType));
+          }
+          rows.add(currentColRangeMetaData.getNumNulls());
+        });
+        return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(rows));
+      }
+    }).filter(f -> f != null);
+    List<StructField> allMetaDataSchema = new ArrayList<>();
+    allMetaDataSchema.add(new StructField("file", StringType$.MODULE$, true, 
Metadata.empty()));
+    cols.forEach(col -> {
+      allMetaDataSchema.add(new StructField(col + "_minValue", 
columnsMap.get(col), true, Metadata.empty()));
+      allMetaDataSchema.add(new StructField(col + "_maxValue", 
columnsMap.get(col), true, Metadata.empty()));
+      allMetaDataSchema.add(new StructField(col + "_num_nulls", 
LongType$.MODULE$, true, Metadata.empty()));
+    });
+    return df.sparkSession().createDataFrame(allMetaDataRDD, 
StructType$.MODULE$.apply(allMetaDataSchema));
+  }
+
+  public static Dataset<Row> getMinMaxValue(Dataset<Row> df, String cols) {
+    List<String> rawCols = Arrays.asList(cols.split(",")).stream().map(f -> 
f.trim()).collect(Collectors.toList());
+    return getMinMaxValue(df, rawCols);
+  }
+
+  /**
+   * Update statistics info.
+   * this method will update old index table by full out join,
+   * and save the updated table into a new index table based on commitTime.
+   * old index table will be cleaned also.
+   *
+   * @param df a spark DataFrame holds parquet files to be read.
+   * @param cols z-sort cols.
+   * @param indexPath index store path.
+   * @param commitTime current operation commitTime.
+   * @param validateCommits all validate commits for current table.
+   * @return
+   */
+  public static void saveStatisticsInfo(Dataset<Row> df, String cols, String 
indexPath, String commitTime, List<String> validateCommits) {
+    Path savePath = new Path(indexPath, commitTime);
+    SparkSession spark = df.sparkSession();
+    FileSystem fs = FSUtils.getFs(indexPath, 
spark.sparkContext().hadoopConfiguration());
+    Dataset<Row> statisticsDF = OrderingIndexHelper.getMinMaxValue(df, cols);
+    // try to find last validate index table from index path
+    try {
+      // If there's currently no index, create one
+      if (!fs.exists(new Path(indexPath))) {
+        
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
+        return;
+      }
+
+      // Otherwise, clean up all indexes but the most recent one
+
+      List<String> allIndexTables = Arrays
+          .stream(fs.listStatus(new Path(indexPath))).filter(f -> 
f.isDirectory()).map(f -> f.getPath().getName()).collect(Collectors.toList());
+      List<String> candidateIndexTables = allIndexTables.stream().filter(f -> 
validateCommits.contains(f)).sorted().collect(Collectors.toList());
+      List<String> residualTables = allIndexTables.stream().filter(f -> 
!validateCommits.contains(f)).collect(Collectors.toList());
+      Option<Dataset> latestIndexData = Option.empty();
+      if (!candidateIndexTables.isEmpty()) {
+        latestIndexData = Option.of(spark.read().load(new Path(indexPath, 
candidateIndexTables.get(candidateIndexTables.size() - 1)).toString()));
+        // clean old index table, keep at most 1 index table.
+        candidateIndexTables.remove(candidateIndexTables.size() - 1);
+        candidateIndexTables.forEach(f -> {
+          try {
+            fs.delete(new Path(indexPath, f));
+          } catch (IOException ie) {
+            throw new HoodieException(ie);
+          }
+        });
+      }
+
+      // clean residualTables
+      // retried cluster operations at the same instant time is also 
considered,
+      // the residual files produced by retried are cleaned up before save 
statistics
+      // save statistics info to index table which named commitTime
+      residualTables.forEach(f -> {
+        try {
+          fs.delete(new Path(indexPath, f));
+        } catch (IOException ie) {
+          throw new HoodieException(ie);
+        }
+      });
+
+      if (latestIndexData.isPresent() && 
latestIndexData.get().schema().equals(statisticsDF.schema())) {
+        // update the statistics info
+        String originalTable = "indexTable_" + 
java.util.UUID.randomUUID().toString().replace("-", "");
+        String updateTable = "updateTable_" + 
java.util.UUID.randomUUID().toString().replace("-", "");
+        latestIndexData.get().registerTempTable(originalTable);
+        statisticsDF.registerTempTable(updateTable);
+        // update table by full out join
+        List columns = Arrays.asList(statisticsDF.schema().fieldNames());
+        spark.sql(ZOrderingIndexHelper.createIndexMergeSql(originalTable, 
updateTable, columns)).repartition(1).write().save(savePath.toString());
+      } else {
+        
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
+      }
+    } catch (IOException e) {
+      throw new HoodieException(e);
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala
index c392f12..a168e55 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala
@@ -19,15 +19,16 @@
 package org.apache.spark.sql.hudi.execution
 
 import org.apache.hudi.config.HoodieClusteringConfig
+import org.apache.hudi.optimize.{HilbertCurveUtils, ZOrderingUtil}
 import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
BoundReference, SortOrder, UnsafeProjection, UnsafeRow}
-import org.apache.hudi.optimize.ZOrderingUtil
-import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.catalyst.InternalRow
 import 
org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
BoundReference, SortOrder, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.util.MutablePair
 import org.apache.spark.util.random.SamplingUtils
+import org.davidmoten.hilbert.HilbertCurve
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
@@ -35,10 +36,10 @@ import scala.reflect.{ClassTag, classTag}
 import scala.util.hashing.byteswap32
 
 class RangeSample[K: ClassTag, V](
-     zEncodeNum: Int,
-     rdd: RDD[_ <: Product2[K, V]],
-     private var ascend: Boolean = true,
-     val samplePointsPerPartitionHint: Int = 20) extends Serializable {
+                                   zEncodeNum: Int,
+                                   rdd: RDD[_ <: Product2[K, V]],
+                                   private var ascend: Boolean = true,
+                                   val samplePointsPerPartitionHint: Int = 20) 
extends Serializable {
 
   // We allow zEncodeNum = 0, which happens when sorting an empty RDD under 
the default settings.
   require(zEncodeNum >= 0, s"Number of zEncodeNum cannot be negative but found 
$zEncodeNum.")
@@ -335,16 +336,21 @@ object RangeSampleSort {
   }
 
   /**
-    * create z-order DataFrame by sample
-    * first, sample origin data to get z-cols bounds, then create z-order 
DataFrame
+    * create optimize DataFrame by sample
+    * first, sample origin data to get order-cols bounds, then apply sort to 
produce DataFrame
     * support all type data.
-    * this method need more resource and cost more time than 
createZIndexedDataFrameByMapValue
+    * this method need more resource and cost more time than 
createOptimizedDataFrameByMapValue
     */
-  def sortDataFrameBySample(df: DataFrame, zCols: Seq[String], fileNum: Int): 
DataFrame = {
+  def sortDataFrameBySample(df: DataFrame, zCols: Seq[String], fileNum: Int, 
sortMode: String): DataFrame = {
     val spark = df.sparkSession
     val columnsMap = df.schema.fields.map(item => (item.name, item)).toMap
     val fieldNum = df.schema.fields.length
     val checkCols = zCols.filter(col => columnsMap(col) != null)
+    val useHilbert = sortMode match {
+      case "hilbert" => true
+      case "z-order" => false
+      case other => throw new IllegalArgumentException(s"new only support 
z-order/hilbert optimize but find: ${other}")
+    }
 
     if (zCols.isEmpty || checkCols.isEmpty) {
       df
@@ -366,7 +372,7 @@ object RangeSampleSort {
       }.filter(_._1 != -1)
       // Complex type found, use createZIndexedDataFrameByRange
       if (zFields.length != zCols.length) {
-        return sortDataFrameBySampleSupportAllTypes(df, zCols, fieldNum)
+        return sortDataFrameBySampleSupportAllTypes(df, zCols, fileNum)
       }
 
       val rawRdd = df.rdd
@@ -441,6 +447,7 @@ object RangeSampleSort {
       val boundBroadCast = 
spark.sparkContext.broadcast(expandSampleBoundsWithFactor)
 
       val indexRdd = rawRdd.mapPartitions { iter =>
+        val hilbertCurve = if (useHilbert) 
Some(HilbertCurve.bits(32).dimensions(zFields.length)) else None
         val expandBoundsWithFactor = boundBroadCast.value
         val maxBoundNum = expandBoundsWithFactor.map(_._1.length).max
         val longDecisionBound = new RawDecisionBound(Ordering[Long])
@@ -507,17 +514,21 @@ object RangeSampleSort {
               case _ =>
                 -1
             }
-          }.filter(v => v != -1).map(ZOrderingUtil.intTo8Byte(_)).toArray
-          val zValues = ZOrderingUtil.interleaving(values, 8)
-          Row.fromSeq(row.toSeq ++ Seq(zValues))
+          }.filter(v => v != -1)
+          val mapValues = if (hilbertCurve.isDefined) {
+            HilbertCurveUtils.indexBytes(hilbertCurve.get, 
values.map(_.toLong).toArray, 32)
+          } else {
+            
ZOrderingUtil.interleaving(values.map(ZOrderingUtil.intTo8Byte(_)).toArray, 8)
+          }
+          Row.fromSeq(row.toSeq ++ Seq(mapValues))
         }
       }.sortBy(x => ZorderingBinarySort(x.getAs[Array[Byte]](fieldNum)), 
numPartitions = fileNum)
       val newDF = df.sparkSession.createDataFrame(indexRdd, StructType(
         df.schema.fields ++ Seq(
-          StructField(s"zindex",
+          StructField(s"index",
             BinaryType, false))
       ))
-      newDF.drop("zindex")
+      newDF.drop("index")
     }
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala
new file mode 100644
index 0000000..4b7864f
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.model.HoodieFileFormat
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.common.util.{BaseFileUtils, ParquetUtils}
+import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
+import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
+import org.apache.spark.OrderingIndexHelper
+import org.apache.spark.sql._
+import org.apache.spark.sql.types._
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.Arguments.arguments
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+import java.sql.{Date, Timestamp}
+import scala.collection.JavaConversions._
+import scala.util.Random
+
+@Tag("functional")
+class TestTableLayoutOptimization extends HoodieClientTestBase {
+  var spark: SparkSession = _
+
+  val commonOpts = Map(
+    "hoodie.insert.shuffle.parallelism" -> "4",
+    "hoodie.upsert.shuffle.parallelism" -> "4",
+    "hoodie.bulkinsert.shuffle.parallelism" -> "4",
+    DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key",
+    DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "partition",
+    DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "timestamp",
+    HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
+  )
+
+  @BeforeEach override def setUp() {
+    initPath()
+    initSparkContexts()
+    spark = sqlContext.sparkSession
+    initTestDataGenerator()
+    initFileSystem()
+  }
+
+  @AfterEach override def tearDown() = {
+    cleanupSparkContexts()
+    cleanupTestDataGenerator()
+    cleanupFileSystem()
+  }
+
+  @ParameterizedTest
+  @MethodSource(Array("testLayOutParameter"))
+  def testOptimizewithClustering(tableType: String, optimizeMode: String): 
Unit = {
+    val targetRecordsCount = 10000
+    // Bulk Insert Operation
+    val records = recordsToStrings(dataGen.generateInserts("001", 
targetRecordsCount)).toList
+    val writeDf: Dataset[Row] = 
spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+    writeDf.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option("hoodie.compact.inline", "false")
+      .option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType)
+      // option for clustering
+      .option("hoodie.parquet.small.file.limit", "0")
+      .option("hoodie.clustering.inline", "true")
+      .option("hoodie.clustering.inline.max.commits", "1")
+      .option("hoodie.clustering.plan.strategy.target.file.max.bytes", 
"1073741824")
+      .option("hoodie.clustering.plan.strategy.small.file.limit", "629145600")
+      .option("hoodie.clustering.plan.strategy.max.bytes.per.group", 
Long.MaxValue.toString)
+      .option("hoodie.clustering.plan.strategy.target.file.max.bytes", 
String.valueOf(64 * 1024 * 1024L))
+      .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true")
+      .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key(), 
optimizeMode)
+      .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, 
"begin_lat, begin_lon")
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    val readDf =
+      spark.read
+        .format("hudi")
+        .load(basePath)
+
+    val readDfSkip =
+      spark.read
+        .option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true")
+        .format("hudi")
+        .load(basePath)
+
+    assertEquals(targetRecordsCount, readDf.count())
+    assertEquals(targetRecordsCount, readDfSkip.count())
+
+    readDf.createOrReplaceTempView("hudi_snapshot_raw")
+    readDfSkip.createOrReplaceTempView("hudi_snapshot_skipping")
+
+    def select(tableName: String) =
+      spark.sql(s"SELECT * FROM $tableName WHERE begin_lat >= 0.49 AND 
begin_lat < 0.51 AND begin_lon >= 0.49 AND begin_lon < 0.51")
+
+    assertRowsMatch(
+      select("hudi_snapshot_raw"),
+      select("hudi_snapshot_skipping")
+    )
+  }
+
+  def assertRowsMatch(one: DataFrame, other: DataFrame) = {
+    val rows = one.count()
+    assert(rows == other.count() && one.intersect(other).count() == rows)
+  }
+
+  @Test
+  def testCollectMinMaxStatistics(): Unit = {
+    val testPath = new Path(System.getProperty("java.io.tmpdir"), "minMax")
+    val statisticPath = new Path(System.getProperty("java.io.tmpdir"), "stat")
+    val fs = testPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
+    val complexDataFrame = createComplexDataFrame(spark)
+    
complexDataFrame.repartition(3).write.mode("overwrite").save(testPath.toString)
+    val df = spark.read.load(testPath.toString)
+    try {
+      // test z-order/hilbert sort for all primitive type
+      // shoud not throw exception.
+      OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, 
"c1,c2,c3,c5,c6,c7,c8", 20, "hilbert").show(1)
+      OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, 
"c1,c2,c3,c5,c6,c7,c8", 20, "z-order").show(1)
+      OrderingIndexHelper.createOptimizeDataFrameBySample(df, 
"c1,c2,c3,c5,c6,c7,c8", 20, "hilbert").show(1)
+      OrderingIndexHelper.createOptimizeDataFrameBySample(df, 
"c1,c2,c3,c5,c6,c7,c8", 20, "z-order").show(1)
+      try {
+        // do not support TimeStampType, so if we collect statistics for c4, 
should throw exception
+        val colDf = OrderingIndexHelper.getMinMaxValue(df, 
"c1,c2,c3,c5,c6,c7,c8")
+        colDf.cache()
+        assertEquals(colDf.count(), 3)
+        assertEquals(colDf.take(1)(0).length, 22)
+        colDf.unpersist()
+        // try to save statistics
+        OrderingIndexHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", 
statisticPath.toString, "2", Seq("0", "1"))
+        // save again
+        OrderingIndexHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", 
statisticPath.toString, "3", Seq("0", "1", "2"))
+        // test old index table clean
+        OrderingIndexHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", 
statisticPath.toString, "4", Seq("0", "1", "3"))
+        assertEquals(!fs.exists(new Path(statisticPath, "2")), true)
+        assertEquals(fs.exists(new Path(statisticPath, "3")), true)
+        // test to save different index, new index on ("c1,c6,c7,c8") should 
be successfully saved.
+        OrderingIndexHelper.saveStatisticsInfo(df, "c1,c6,c7,c8", 
statisticPath.toString, "5", Seq("0", "1", "3", "4"))
+        assertEquals(fs.exists(new Path(statisticPath, "5")), true)
+      } finally {
+        if (fs.exists(testPath)) fs.delete(testPath)
+        if (fs.exists(statisticPath)) fs.delete(statisticPath)
+      }
+    }
+  }
+
+  // test collect min-max statistic info for DateType in the case of 
multithreading.
+  // parquet will give a wrong statistic result for DateType in the case of 
multithreading.
+  @Test
+  def testMultiThreadParquetFooterReadForDateType(): Unit = {
+    // create parquet file with DateType
+    val rdd = spark.sparkContext.parallelize(0 to 100, 1)
+      .map(item => RowFactory.create(Date.valueOf(s"${2020}-${item % 11 + 
1}-${item % 28 + 1}")))
+    val df = spark.createDataFrame(rdd, new StructType().add("id", DateType))
+    val testPath = new Path(System.getProperty("java.io.tmpdir"), 
"testCollectDateType")
+    val conf = spark.sparkContext.hadoopConfiguration
+    val cols = new java.util.ArrayList[String]
+    cols.add("id")
+    try {
+      df.repartition(3).write.mode("overwrite").save(testPath.toString)
+      val inputFiles = spark.read.load(testPath.toString).inputFiles.sortBy(x 
=> x)
+
+      val realResult = new Array[(String, String)](3)
+      inputFiles.zipWithIndex.foreach { case (f, index) =>
+        val fileUtils = 
BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils]
+        val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f), 
cols).iterator().next()
+        realResult(index) = (res.getMinValue.toString, 
res.getMaxValue.toString)
+      }
+
+      // multi thread read with no lock
+      val resUseLock = new Array[(String, String)](3)
+      inputFiles.zipWithIndex.par.foreach { case (f, index) =>
+        val fileUtils = 
BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils]
+        val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f), 
cols).iterator().next()
+        resUseLock(index) = (res.getMinValue.toString, 
res.getMaxValue.toString)
+      }
+
+      // check resUseNoLock,
+      // We can't guarantee that there must be problems in the case of 
multithreading.
+      // In order to make ut pass smoothly, we will not check resUseNoLock.
+      // check resUseLock
+      // should pass assert
+      realResult.zip(resUseLock).foreach { case (realValue, testValue) =>
+        assert(realValue == testValue, s" expect realValue: ${realValue} but 
find ${testValue}")
+      }
+    } finally {
+      if (fs.exists(testPath)) fs.delete(testPath)
+    }
+  }
+
+  def createComplexDataFrame(spark: SparkSession): DataFrame = {
+    val schema = new StructType()
+      .add("c1", IntegerType)
+      .add("c2", StringType)
+      .add("c3", DecimalType(9,3))
+      .add("c4", TimestampType)
+      .add("c5", ShortType)
+      .add("c6", DateType)
+      .add("c7", BinaryType)
+      .add("c8", ByteType)
+
+    val rdd = spark.sparkContext.parallelize(0 to 1000, 1).map { item =>
+      val c1 = Integer.valueOf(item)
+      val c2 = s" ${item}sdc"
+      val c3 = new java.math.BigDecimal(s"${Random.nextInt(1000)}.${item}")
+      val c4 = new Timestamp(System.currentTimeMillis())
+      val c5 = java.lang.Short.valueOf(s"${(item + 16) /10}")
+      val c6 = Date.valueOf(s"${2020}-${item % 11  +  1}-${item % 28  + 1}")
+      val c7 = Array(item).map(_.toByte)
+      val c8 = java.lang.Byte.valueOf("9")
+
+      RowFactory.create(c1, c2, c3, c4, c5, c6, c7, c8)
+    }
+    spark.createDataFrame(rdd, schema)
+  }
+}
+
+object TestTableLayoutOptimization {
+  def testLayOutParameter(): java.util.stream.Stream[Arguments] = {
+    java.util.stream.Stream.of(
+      arguments("COPY_ON_WRITE", "hilbert"),
+      arguments("COPY_ON_WRITE", "z-order"),
+      arguments("MERGE_ON_READ", "hilbert"),
+      arguments("MERGE_ON_READ", "z-order")
+    )
+  }
+}
+
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala
new file mode 100644
index 0000000..c8263b3
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.OrderingIndexHelper
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.hudi.TestHoodieSqlBase
+
+import scala.util.Random
+
+object SpaceCurveOptimizeBenchMark extends TestHoodieSqlBase {
+
+  def getSkippingPercent(tableName: String, co1: String, co2: String, value1: 
Int, value2: Int): Unit= {
+    val minMax = OrderingIndexHelper
+      .getMinMaxValue(spark.sql(s"select * from ${tableName}"), s"${co1}, 
${co2}")
+      .collect().map(f => (f.getInt(1), f.getInt(2), f.getInt(4), f.getInt(5)))
+    var c = 0
+    for (elem <- minMax) {
+      if ((elem._1 <= value1 && elem._2 >= value1) || (elem._3 <= value2 && 
elem._4 >= value2)) {
+        c = c + 1
+      }
+    }
+
+    val p = c / minMax.size.toDouble
+    println(s"for table ${tableName} with query filter: ${co1} = ${value1} or 
${co2} = ${value2} we can achieve skipping percent ${1.0 - p}")
+  }
+
+  /*
+  for table table_z_sort_byMap with query filter: c1_int = 500000 or c2_int = 
500000 we can achieve skipping percent 0.8
+  for table table_z_sort_bySample with query filter: c1_int = 500000 or c2_int 
= 500000 we can achieve skipping percent 0.77
+  for table table_hilbert_sort_byMap with query filter: c1_int = 500000 or 
c2_int = 500000 we can achieve skipping percent 0.855
+  for table table_hilbert_sort_bySample with query filter: c1_int = 500000 or 
c2_int = 500000 we can achieve skipping percent 0.83
+  */
+  def runNormalTableSkippingBenchMark(): Unit = {
+    withTempDir { f =>
+      withTempTable("table_z_sort_byMap", "table_z_sort_bySample", 
"table_hilbert_sort_byMap", "table_hilbert_sort_bySample") {
+        prepareInterTypeTable(new Path(f.getAbsolutePath), 1000000)
+        // choose median value as filter condition.
+        // the median value of c1_int is 500000
+        // the median value of c2_int is 500000
+        getSkippingPercent("table_z_sort_byMap", "c1_int", "c2_int", 500000, 
500000)
+        getSkippingPercent("table_z_sort_bySample", "c1_int", "c2_int", 
500000, 500000)
+        getSkippingPercent("table_hilbert_sort_byMap", "c1_int", "c2_int", 
500000, 500000)
+        getSkippingPercent("table_hilbert_sort_bySample", "c1_int", "c2_int", 
500000, 500000)
+      }
+    }
+  }
+
+  /*
+  for table table_z_sort_byMap_skew with query filter: c1_int = 5000 or c2_int 
= 500000 we can achieve skipping percent 0.0
+  for table table_z_sort_bySample_skew with query filter: c1_int = 5000 or 
c2_int = 500000 we can achieve skipping percent 0.78
+  for table table_hilbert_sort_byMap_skew with query filter: c1_int = 5000 or 
c2_int = 500000 we can achieve skipping percent 0.05500000000000005
+  for table table_hilbert_sort_bySample_skew with query filter: c1_int = 5000 
or c2_int = 500000 we can achieve skipping percent 0.84
+  */
+  def runSkewTableSkippingBenchMark(): Unit = {
+    withTempDir { f =>
+      withTempTable("table_z_sort_byMap_skew", "table_z_sort_bySample_skew", 
"table_hilbert_sort_byMap_skew", "table_hilbert_sort_bySample_skew") {
+        // prepare skewed table.
+        prepareInterTypeTable(new Path(f.getAbsolutePath), 1000000, 10000, 
1000000, true)
+        // choose median value as filter condition.
+        // the median value of c1_int is 5000
+        // the median value of c2_int is 500000
+        getSkippingPercent("table_z_sort_byMap_skew", "c1_int", "c2_int", 
5000, 500000)
+        getSkippingPercent("table_z_sort_bySample_skew", "c1_int", "c2_int", 
5000, 500000)
+        getSkippingPercent("table_hilbert_sort_byMap_skew", "c1_int", 
"c2_int", 5000, 500000)
+        getSkippingPercent("table_hilbert_sort_bySample_skew", "c1_int", 
"c2_int", 5000, 500000)
+      }
+    }
+  }
+
+  def main(args: Array[String]): Unit = {
+    runNormalTableSkippingBenchMark()
+    runSkewTableSkippingBenchMark()
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+    try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def prepareInterTypeTable(tablePath: Path, numRows: Int, col1Range: Int = 
1000000, col2Range: Int = 1000000, skewed: Boolean = false): Unit = {
+    import spark.implicits._
+    val df = spark.range(numRows).map(_ => (Random.nextInt(col1Range), 
Random.nextInt(col2Range))).toDF("c1_int", "c2_int")
+    val dfOptimizeByMap = 
OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, "c1_int, c2_int", 
200, "z-order")
+    val dfOptimizeBySample = 
OrderingIndexHelper.createOptimizeDataFrameBySample(df, "c1_int, c2_int", 200, 
"z-order")
+
+    val dfHilbertOptimizeByMap = 
OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, "c1_int, c2_int", 
200, "hilbert")
+    val dfHilbertOptimizeBySample = 
OrderingIndexHelper.createOptimizeDataFrameBySample(df, "c1_int, c2_int", 200, 
"hilbert")
+
+    saveAsTable(dfOptimizeByMap, tablePath, if (skewed) "z_sort_byMap_skew" 
else "z_sort_byMap")
+    saveAsTable(dfOptimizeBySample, tablePath, if (skewed) 
"z_sort_bySample_skew" else "z_sort_bySample")
+    saveAsTable(dfHilbertOptimizeByMap, tablePath, if (skewed) 
"hilbert_sort_byMap_skew" else "hilbert_sort_byMap")
+    saveAsTable(dfHilbertOptimizeBySample, tablePath, if (skewed) 
"hilbert_sort_bySample_skew" else "hilbert_sort_bySample")
+  }
+
+  def saveAsTable(df: DataFrame, savePath: Path, suffix: String): Unit = {
+
+    df.write.mode("overwrite").save(new Path(savePath, suffix).toString)
+    spark.read.parquet(new Path(savePath, 
suffix).toString).createOrReplaceTempView("table_" + suffix)
+  }
+}
+
diff --git a/packaging/hudi-flink-bundle/pom.xml 
b/packaging/hudi-flink-bundle/pom.xml
index 5f45fdd..1766a98 100644
--- a/packaging/hudi-flink-bundle/pom.xml
+++ b/packaging/hudi-flink-bundle/pom.xml
@@ -107,6 +107,8 @@
                   
<include>com.fasterxml.jackson.core:jackson-databind</include>
                   <include>com.fasterxml.jackson.core:jackson-core</include>
 
+                  <include>com.github.davidmoten:guava-mini</include>
+                  <include>com.github.davidmoten:hilbert-curve</include>
                   
<include>com.twitter:bijection-avro_${scala.binary.version}</include>
                   
<include>com.twitter:bijection-core_${scala.binary.version}</include>
                   <include>io.dropwizard.metrics:metrics-core</include>
diff --git a/packaging/hudi-kafka-connect-bundle/pom.xml 
b/packaging/hudi-kafka-connect-bundle/pom.xml
index cd7b151..b8a4bcd 100644
--- a/packaging/hudi-kafka-connect-bundle/pom.xml
+++ b/packaging/hudi-kafka-connect-bundle/pom.xml
@@ -89,6 +89,8 @@
                                     
<include>org.apache.flink:flink-core</include>
                                     
<include>org.apache.flink:flink-hadoop-compatibility_${scala.binary.version}</include>
 
+                                    
<include>com.github.davidmoten:guava-mini</include>
+                                    
<include>com.github.davidmoten:hilbert-curve</include>
                                     
<include>com.yammer.metrics:metrics-core</include>
                                     <include>com.beust:jcommander</include>
                                     <include>io.javalin:javalin</include>
diff --git a/packaging/hudi-spark-bundle/pom.xml 
b/packaging/hudi-spark-bundle/pom.xml
index 32a9abf..02d11be 100644
--- a/packaging/hudi-spark-bundle/pom.xml
+++ b/packaging/hudi-spark-bundle/pom.xml
@@ -88,6 +88,8 @@
                   <include>org.antlr:stringtemplate</include>
                   <include>org.apache.parquet:parquet-avro</include>
 
+                  <include>com.github.davidmoten:guava-mini</include>
+                  <include>com.github.davidmoten:hilbert-curve</include>
                   
<include>com.twitter:bijection-avro_${scala.binary.version}</include>
                   
<include>com.twitter:bijection-core_${scala.binary.version}</include>
                   <include>io.dropwizard.metrics:metrics-core</include>
diff --git a/packaging/hudi-utilities-bundle/pom.xml 
b/packaging/hudi-utilities-bundle/pom.xml
index 568e9d5..798c27f 100644
--- a/packaging/hudi-utilities-bundle/pom.xml
+++ b/packaging/hudi-utilities-bundle/pom.xml
@@ -117,6 +117,8 @@
                   <include>com.amazonaws:aws-java-sdk-dynamodb</include>
                   <include>com.amazonaws:aws-java-sdk-core</include>
 
+                  <include>com.github.davidmoten:guava-mini</include>
+                  <include>com.github.davidmoten:hilbert-curve</include>
                   
<include>com.twitter:bijection-avro_${scala.binary.version}</include>
                   
<include>com.twitter:bijection-core_${scala.binary.version}</include>
                   <include>io.confluent:kafka-avro-serializer</include>

Reply via email to