This is an automated email from the ASF dual-hosted git repository.
zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6facb71458 [Core] Add order type in paimon (#5020)
6facb71458 is described below
commit 6facb714588c627a4f7f82ee08b982a950233737
Author: Fang Yong <[email protected]>
AuthorDate: Tue Feb 11 15:57:23 2025 +0800
[Core] Add order type in paimon (#5020)
---
.../main/java/org/apache/paimon/CoreOptions.java | 33 ++++++++++++++++++++
.../org/apache/paimon/sort/zorder/ZIndexer.java | 5 ++-
.../paimon/sort/zorder}/ZOrderByteUtils.java | 2 +-
.../paimon/sort/zorder}/TestZOrderByteUtil.java | 2 +-
.../apache/paimon/sort/zorder/ZIndexerTest.java | 1 -
.../paimon/flink/action/SortCompactAction.java | 2 +-
.../apache/paimon/flink/sink/FlinkSinkBuilder.java | 8 ++---
.../apache/paimon/flink/sorter/TableSortInfo.java | 2 +-
.../apache/paimon/flink/sorter/TableSorter.java | 31 +------------------
.../paimon/flink/sorter/TableSortInfoTest.java | 2 +-
.../paimon/spark/procedure/CompactProcedure.java | 13 ++++----
.../apache/paimon/spark/sort/SparkZOrderUDF.java | 2 +-
.../org/apache/paimon/spark/sort/TableSorter.java | 36 ++--------------------
13 files changed, 55 insertions(+), 84 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index afe4c50208..abc1616087 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -3226,4 +3226,37 @@ public class CoreOptions implements Serializable {
return value;
}
}
+
+ /** The order type of table sort. */
+ public enum OrderType {
+ ORDER("order"),
+ ZORDER("zorder"),
+ HILBERT("hilbert"),
+ NONE("none");
+
+ private final String orderType;
+
+ OrderType(String orderType) {
+ this.orderType = orderType;
+ }
+
+ @Override
+ public String toString() {
+ return "order type: " + orderType;
+ }
+
+ public static OrderType of(String orderType) {
+ if (ORDER.orderType.equalsIgnoreCase(orderType)) {
+ return ORDER;
+ } else if (ZORDER.orderType.equalsIgnoreCase(orderType)) {
+ return ZORDER;
+ } else if (HILBERT.orderType.equalsIgnoreCase(orderType)) {
+ return HILBERT;
+ } else if (NONE.orderType.equalsIgnoreCase(orderType)) {
+ return NONE;
+ }
+
+ throw new IllegalArgumentException("cannot match type: " +
orderType + " for ordering");
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
b/paimon-common/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
similarity index 98%
rename from
paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
rename to
paimon-common/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
index 14af2a95a7..c06ba8fa06 100644
--- a/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
@@ -47,7 +47,6 @@ import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.types.VariantType;
-import org.apache.paimon.utils.ZOrderByteUtils;
import java.io.Serializable;
import java.nio.ByteBuffer;
@@ -57,8 +56,8 @@ import java.util.List;
import java.util.Set;
import java.util.function.BiFunction;
-import static org.apache.paimon.utils.ZOrderByteUtils.NULL_BYTES;
-import static org.apache.paimon.utils.ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE;
+import static org.apache.paimon.sort.zorder.ZOrderByteUtils.NULL_BYTES;
+import static
org.apache.paimon.sort.zorder.ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE;
/** Z-indexer for responsibility to generate z-index. */
public class ZIndexer implements Serializable {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java
b/paimon-common/src/main/java/org/apache/paimon/sort/zorder/ZOrderByteUtils.java
similarity index 99%
rename from
paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java
rename to
paimon-common/src/main/java/org/apache/paimon/sort/zorder/ZOrderByteUtils.java
index 3baff77ede..04bf863a3f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java
+++
b/paimon-common/src/main/java/org/apache/paimon/sort/zorder/ZOrderByteUtils.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.utils;
+package org.apache.paimon.sort.zorder;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java
b/paimon-common/src/test/java/org/apache/paimon/sort/zorder/TestZOrderByteUtil.java
similarity index 99%
rename from
paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java
rename to
paimon-common/src/test/java/org/apache/paimon/sort/zorder/TestZOrderByteUtil.java
index 92786b701c..72ffc17121 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java
+++
b/paimon-common/src/test/java/org/apache/paimon/sort/zorder/TestZOrderByteUtil.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.utils;
+package org.apache.paimon.sort.zorder;
import org.junit.Assert;
import org.junit.Test;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java
b/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java
index c94327817b..fbaf9f90aa 100644
--- a/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java
@@ -25,7 +25,6 @@ import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
-import org.apache.paimon.utils.ZOrderByteUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
index eb545ea437..2724a5da3a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -19,11 +19,11 @@
package org.apache.paimon.flink.action;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.OrderType;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.SortCompactSinkBuilder;
import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
-import org.apache.paimon.flink.sorter.TableSorter.OrderType;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index ecaa5678dd..4fd0edf6f0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.CoreOptions.OrderType;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
@@ -25,7 +26,6 @@ import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
-import org.apache.paimon.flink.sorter.TableSorter.OrderType;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
@@ -51,14 +51,14 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import static org.apache.paimon.CoreOptions.OrderType.HILBERT;
+import static org.apache.paimon.CoreOptions.OrderType.ORDER;
+import static org.apache.paimon.CoreOptions.OrderType.ZORDER;
import static
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
import static
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
import static
org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR;
import static org.apache.paimon.flink.sink.FlinkSink.isStreaming;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
-import static org.apache.paimon.flink.sorter.TableSorter.OrderType.HILBERT;
-import static org.apache.paimon.flink.sorter.TableSorter.OrderType.ORDER;
-import static org.apache.paimon.flink.sorter.TableSorter.OrderType.ZORDER;
import static org.apache.paimon.table.BucketMode.BUCKET_UNAWARE;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSortInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSortInfo.java
index 9b359e232d..e8948c8027 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSortInfo.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSortInfo.java
@@ -18,8 +18,8 @@
package org.apache.paimon.flink.sorter;
+import org.apache.paimon.CoreOptions.OrderType;
import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.sorter.TableSorter.OrderType;
import java.util.Collections;
import java.util.List;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
index a0d4b6af26..1d51ca4ebe 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sorter;
+import org.apache.paimon.CoreOptions.OrderType;
import org.apache.paimon.flink.action.SortCompactAction;
import org.apache.paimon.table.FileStoreTable;
@@ -83,34 +84,4 @@ public abstract class TableSorter {
throw new IllegalArgumentException("cannot match order type: "
+ sortStrategy);
}
}
-
- /** The order type of table sort. */
- public enum OrderType {
- ORDER("order"),
- ZORDER("zorder"),
- HILBERT("hilbert");
-
- private final String orderType;
-
- OrderType(String orderType) {
- this.orderType = orderType;
- }
-
- @Override
- public String toString() {
- return "order type: " + orderType;
- }
-
- public static OrderType of(String orderType) {
- if (ORDER.orderType.equalsIgnoreCase(orderType)) {
- return ORDER;
- } else if (ZORDER.orderType.equalsIgnoreCase(orderType)) {
- return ZORDER;
- } else if (HILBERT.orderType.equalsIgnoreCase(orderType)) {
- return HILBERT;
- }
-
- throw new IllegalArgumentException("cannot match type: " +
orderType + " for ordering");
- }
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/TableSortInfoTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/TableSortInfoTest.java
index e9e01baead..eae05cd4c8 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/TableSortInfoTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/TableSortInfoTest.java
@@ -18,8 +18,8 @@
package org.apache.paimon.flink.sorter;
+import org.apache.paimon.CoreOptions.OrderType;
import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.sorter.TableSorter.OrderType;
import org.junit.jupiter.api.Test;
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 4a43e39c31..689e2cabd3 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -19,6 +19,7 @@
package org.apache.paimon.spark.procedure;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.CoreOptions.OrderType;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.append.UnawareAppendCompactionTask;
import org.apache.paimon.append.UnawareAppendTableCompactionCoordinator;
@@ -144,7 +145,7 @@ public class CompactProcedure extends BaseProcedure {
String partitions = blank(args, 1) ? null : args.getString(1);
// make full compact strategy as default.
String compactStrategy = blank(args, 2) ? FULL : args.getString(2);
- String sortType = blank(args, 3) ? TableSorter.OrderType.NONE.name() :
args.getString(3);
+ String sortType = blank(args, 3) ? OrderType.NONE.name() :
args.getString(3);
List<String> sortColumns =
blank(args, 4)
? Collections.emptyList()
@@ -153,11 +154,11 @@ public class CompactProcedure extends BaseProcedure {
String options = args.isNullAt(6) ? null : args.getString(6);
Duration partitionIdleTime =
blank(args, 7) ? null :
TimeUtils.parseDuration(args.getString(7));
- if (TableSorter.OrderType.NONE.name().equals(sortType) &&
!sortColumns.isEmpty()) {
+ if (OrderType.NONE.name().equals(sortType) && !sortColumns.isEmpty()) {
throw new IllegalArgumentException(
"order_strategy \"none\" cannot work with order_by
columns.");
}
- if (partitionIdleTime != null &&
(!TableSorter.OrderType.NONE.name().equals(sortType))) {
+ if (partitionIdleTime != null &&
(!OrderType.NONE.name().equals(sortType))) {
throw new IllegalArgumentException(
"sort compact do not support 'partition_idle_time'.");
}
@@ -234,7 +235,7 @@ public class CompactProcedure extends BaseProcedure {
@Nullable Expression condition,
@Nullable Duration partitionIdleTime) {
BucketMode bucketMode = table.bucketMode();
- TableSorter.OrderType orderType = TableSorter.OrderType.of(sortType);
+ OrderType orderType = OrderType.of(sortType);
boolean fullCompact = compactStrategy.equalsIgnoreCase(FULL);
Predicate filter =
condition == null
@@ -245,7 +246,7 @@ public class CompactProcedure extends BaseProcedure {
table.rowType(),
false)
.getOrElse(null);
- if (orderType.equals(TableSorter.OrderType.NONE)) {
+ if (orderType.equals(OrderType.NONE)) {
JavaSparkContext javaSparkContext = new
JavaSparkContext(spark().sparkContext());
switch (bucketMode) {
case HASH_FIXED:
@@ -474,7 +475,7 @@ public class CompactProcedure extends BaseProcedure {
private void sortCompactUnAwareBucketTable(
FileStoreTable table,
- TableSorter.OrderType orderType,
+ OrderType orderType,
List<String> sortColumns,
DataSourceV2Relation relation,
@Nullable Predicate filter) {
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkZOrderUDF.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkZOrderUDF.java
index c41c75cd85..45858fe62a 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkZOrderUDF.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkZOrderUDF.java
@@ -18,7 +18,7 @@
package org.apache.paimon.spark.sort;
-import org.apache.paimon.utils.ZOrderByteUtils;
+import org.apache.paimon.sort.zorder.ZOrderByteUtils;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.expressions.UserDefinedFunction;
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java
index b76d56a9aa..a96724fad6 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/TableSorter.java
@@ -18,6 +18,7 @@
package org.apache.paimon.spark.sort;
+import org.apache.paimon.CoreOptions.OrderType;
import org.apache.paimon.table.FileStoreTable;
import org.apache.spark.sql.Dataset;
@@ -62,7 +63,7 @@ public abstract class TableSorter {
public abstract Dataset<Row> sort(Dataset<Row> input);
public static TableSorter getSorter(
- FileStoreTable table, TableSorter.OrderType orderType,
List<String> orderColumns) {
+ FileStoreTable table, OrderType orderType, List<String>
orderColumns) {
switch (orderType) {
case ORDER:
return new OrderSorter(table, orderColumns);
@@ -81,37 +82,4 @@ public abstract class TableSorter {
throw new IllegalArgumentException("cannot match order type: "
+ orderType);
}
}
-
- /** order type for sorting. */
- public enum OrderType {
- ORDER("order"),
- ZORDER("zorder"),
- HILBERT("hilbert"),
- NONE("none");
-
- private final String orderType;
-
- OrderType(String orderType) {
- this.orderType = orderType;
- }
-
- @Override
- public String toString() {
- return "order type: " + orderType;
- }
-
- public static OrderType of(String orderType) {
- if (ORDER.orderType.equalsIgnoreCase(orderType)) {
- return ORDER;
- } else if (ZORDER.orderType.equalsIgnoreCase(orderType)) {
- return ZORDER;
- } else if (HILBERT.orderType.equalsIgnoreCase(orderType)) {
- return HILBERT;
- } else if (NONE.orderType.equalsIgnoreCase(orderType)) {
- return NONE;
- }
-
- throw new IllegalArgumentException("cannot match type: " +
orderType + " for ordering");
- }
- }
}