This is an automated email from the ASF dual-hosted git repository.
yaozhq pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git
The following commit(s) were added to refs/heads/master by this push:
new 49fd5c66 feat: Optimizing Sorting Performance via Radix Sort Algorithm
(#589)
49fd5c66 is described below
commit 49fd5c6666e7f7e1bcd3b5c5a8e1166e816dd46c
Author: Fan Tianlan <[email protected]>
AuthorDate: Mon Oct 13 10:16:52 2025 +0800
feat: Optimizing Sorting Performance via Radix Sort Algorithm (#589)
* Optimizing Sorting Performance via Radix Sort Algorithm
* Fix style
* Optimizing the efficiency of the radix sort algorithm
* Max String Precision
* add tests
* modify pom
* fix style
* fix style2
* fix style2
* refractor equals for BinaryStringType
* concurrency-safe sorting
---
.../java/org/apache/geaflow/common/type/Types.java | 4 +-
.../common/type/primitive/BinaryStringType.java | 31 +++
.../geaflow/dsl/catalog/console/CatalogUtil.java | 6 +-
.../org/apache/geaflow/dsl/util/SqlTypeUtil.java | 9 +-
geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml | 11 +
.../runtime/function/table/OrderByHeapSort.java | 76 +++++
.../runtime/function/table/OrderByRadixSort.java | 59 ++++
.../dsl/runtime/function/table/OrderByTimSort.java | 62 +++++
.../function/table/order/MultiFieldRadixSort.java | 264 ++++++++++++++++++
.../dsl/runtime/function/table/order/SortInfo.java | 20 ++
.../dsl/runtime/plan/PhysicSortRelNode.java | 15 +-
.../runtime/benchmark/OrderMemoryBenchmark.java | 178 ++++++++++++
.../dsl/runtime/benchmark/OrderTimeBenchmark.java | 307 +++++++++++++++++++++
.../geaflow/dsl/runtime/plan/StepPlanTest.java | 2 +-
.../dsl/runtime/query/MultiFieldRadixSortTest.java | 297 ++++++++++++++++++++
.../apache/geaflow/dsl/runtime/query/SortTest.java | 9 +
.../src/test/resources/expect/sort_004.txt | 22 ++
.../resources/query/sort_004.sql} | 42 +--
geaflow/geaflow-dsl/pom.xml | 2 +-
19 files changed, 1385 insertions(+), 31 deletions(-)
diff --git
a/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/Types.java
b/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/Types.java
index ea365a0a..64481553 100644
---
a/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/Types.java
+++
b/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/Types.java
@@ -94,7 +94,7 @@ public class Types {
return TYPE_IMMUTABLE_MAP.get(type);
}
- public static IType<?> of(String typeName) {
+ public static IType<?> of(String typeName, int precision) {
if (typeName == null) {
throw new IllegalArgumentException("typeName is null");
}
@@ -116,7 +116,7 @@ public class Types {
case TYPE_NAME_DECIMAL:
return DECIMAL;
case TYPE_NAME_BINARY_STRING:
- return BINARY_STRING;
+ return new BinaryStringType(precision);
case TYPE_NAME_TIMESTAMP:
return TIMESTAMP;
case TYPE_NAME_DATE:
diff --git
a/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/primitive/BinaryStringType.java
b/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/primitive/BinaryStringType.java
index 1d4bf2be..d9817f07 100644
---
a/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/primitive/BinaryStringType.java
+++
b/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/primitive/BinaryStringType.java
@@ -27,6 +27,37 @@ public class BinaryStringType implements IType<BinaryString>
{
public static final BinaryStringType INSTANCE = new BinaryStringType();
+ private int precision;
+
+ public BinaryStringType() {
+
+ }
+
+ public BinaryStringType(int precision) {
+ this.precision = precision;
+ }
+
+ public int getPrecision() {
+ return precision;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj == null) {
+ return false;
+ }
+
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
+ return true;
+ }
+
@Override
public String getName() {
return Types.TYPE_NAME_BINARY_STRING;
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-catalog/src/main/java/org/apache/geaflow/dsl/catalog/console/CatalogUtil.java
b/geaflow/geaflow-dsl/geaflow-dsl-catalog/src/main/java/org/apache/geaflow/dsl/catalog/console/CatalogUtil.java
index e3aa4db6..6f200a17 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-catalog/src/main/java/org/apache/geaflow/dsl/catalog/console/CatalogUtil.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-catalog/src/main/java/org/apache/geaflow/dsl/catalog/console/CatalogUtil.java
@@ -98,7 +98,7 @@ public class CatalogUtil {
idFieldName = fieldModel.getName();
}
String typeName = convertTypeName(fieldModel.getType().name());
- IType<?> fieldType = Types.of(typeName);
+ IType<?> fieldType = Types.of(typeName, -1);
TableField field = new TableField(fieldModel.getName(), fieldType,
false);
fields.add(field);
}
@@ -154,7 +154,7 @@ public class CatalogUtil {
default:
}
String typeName = convertTypeName(fieldModel.getType().name());
- IType<?> fieldType = Types.of(typeName);
+ IType<?> fieldType = Types.of(typeName, -1);
TableField field = new TableField(fieldModel.getName(), fieldType,
false);
fields.add(field);
}
@@ -237,7 +237,7 @@ public class CatalogUtil {
List<TableField> fields = new ArrayList<>(fieldModels.size());
for (FieldModel fieldModel : fieldModels) {
String typeName = convertTypeName(fieldModel.getType().name());
- IType<?> fieldType = Types.of(typeName);
+ IType<?> fieldType = Types.of(typeName, -1);
TableField field = new TableField(fieldModel.getName(), fieldType,
false);
fields.add(field);
}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-parser/src/main/java/org/apache/geaflow/dsl/util/SqlTypeUtil.java
b/geaflow/geaflow-dsl/geaflow-dsl-parser/src/main/java/org/apache/geaflow/dsl/util/SqlTypeUtil.java
index a79d3c24..dc66526a 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-parser/src/main/java/org/apache/geaflow/dsl/util/SqlTypeUtil.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-parser/src/main/java/org/apache/geaflow/dsl/util/SqlTypeUtil.java
@@ -29,6 +29,7 @@ import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.geaflow.common.type.IType;
import org.apache.geaflow.common.type.Types;
+import org.apache.geaflow.common.type.primitive.BinaryStringType;
import org.apache.geaflow.dsl.calcite.EdgeRecordType;
import org.apache.geaflow.dsl.calcite.GraphRecordType;
import org.apache.geaflow.dsl.calcite.PathRecordType;
@@ -47,7 +48,7 @@ public final class SqlTypeUtil {
public static IType<?> convertType(SqlDataTypeSpec typeSpec) {
String typeName = typeSpec.getTypeName().getSimple().toUpperCase();
typeName = convertTypeName(typeName);
- return Types.of(typeName);
+ return Types.of(typeName, typeSpec.getPrecision());
}
public static IType<?> convertType(RelDataType type) {
@@ -90,7 +91,7 @@ public final class SqlTypeUtil {
public static IType<?> ofTypeName(SqlTypeName sqlTypeName) {
String typeName = convertTypeName(sqlTypeName.getName());
- return Types.of(typeName);
+ return Types.of(typeName, sqlTypeName.getPrecision());
}
public static RelDataType convertToRelType(IType<?> type, boolean
isNullable,
@@ -129,7 +130,9 @@ public final class SqlTypeUtil {
default:
if (type.isPrimitive()) {
String sqlTypeName = convertToSqlTypeName(type);
- SqlTypeName typeName = SqlTypeName.valueOf(sqlTypeName);
+ SqlTypeName typeName = Types.getType(type.getTypeClass())
== Types.BINARY_STRING
+ ? SqlTypeName.get(sqlTypeName, ((BinaryStringType)
type).getPrecision())
+ : SqlTypeName.get(sqlTypeName);
return
typeFactory.createTypeWithNullability(typeFactory.createSqlType(typeName),
isNullable);
} else {
throw new GeaFlowDSLException("Not support type: " + type);
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml
index 111d3608..042dfcfc 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml
@@ -116,6 +116,17 @@
<artifactId>testng</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-generator-annprocess</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<!-- include kafka server for tests -->
<groupId>org.apache.kafka</groupId>
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByHeapSort.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByHeapSort.java
new file mode 100644
index 00000000..dc57095c
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByHeapSort.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.runtime.function.table;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.PriorityQueue;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.function.FunctionContext;
+import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo;
+import org.apache.geaflow.dsl.runtime.function.table.order.TopNRowComparator;
+
+public class OrderByHeapSort implements OrderByFunction {
+
+ private final SortInfo sortInfo;
+
+ private PriorityQueue<Row> topNQueue;
+
+ private TopNRowComparator<Row> topNRowComparator;
+
+ public OrderByHeapSort(SortInfo sortInfo) {
+ this.sortInfo = sortInfo;
+ }
+
+ @Override
+ public void open(FunctionContext context) {
+ this.topNRowComparator = new TopNRowComparator<>(sortInfo);
+ this.topNQueue = new PriorityQueue<>(
+ sortInfo.fetch, topNRowComparator.getNegativeComparator());
+ }
+
+ @Override
+ public void process(Row row) {
+ if (topNQueue.size() == sortInfo.fetch) {
+ if (sortInfo.orderByFields.isEmpty()) {
+ return;
+ }
+ Row top = topNQueue.peek();
+ if (topNQueue.comparator().compare(top, row) < 0) {
+ topNQueue.remove();
+ topNQueue.add(row);
+ }
+ } else {
+ topNQueue.add(row);
+ }
+ }
+
+ @Override
+ public Iterable<Row> finish() {
+ List<Row> results = new ArrayList<>();
+ while (!topNQueue.isEmpty()) {
+ results.add(topNQueue.remove());
+ }
+ Collections.reverse(results);
+ topNQueue.clear();
+ return results;
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByRadixSort.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByRadixSort.java
new file mode 100644
index 00000000..515d2c48
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByRadixSort.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.runtime.function.table;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.function.FunctionContext;
+import org.apache.geaflow.dsl.runtime.function.table.order.MultiFieldRadixSort;
+import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo;
+
+public class OrderByRadixSort implements OrderByFunction {
+
+ private final SortInfo sortInfo;
+
+ private List<Row> allRows;
+
+ public OrderByRadixSort(SortInfo sortInfo) {
+ this.sortInfo = sortInfo;
+ }
+
+ @Override
+ public void open(FunctionContext context) {
+ this.allRows = new ArrayList<>();
+ }
+
+ @Override
+ public void process(Row row) {
+ if (sortInfo.fetch == 0) {
+ return;
+ }
+ allRows.add(row);
+ }
+
+ @Override
+ public Iterable<Row> finish() {
+ List<Row> sortedRows = new ArrayList<>(allRows);
+ MultiFieldRadixSort.multiFieldRadixSort(sortedRows, sortInfo);
+ allRows.clear();
+ return sortedRows;
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByTimSort.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByTimSort.java
new file mode 100644
index 00000000..6b4e216d
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByTimSort.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.runtime.function.table;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.function.FunctionContext;
+import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo;
+import org.apache.geaflow.dsl.runtime.function.table.order.TopNRowComparator;
+
+public class OrderByTimSort implements OrderByFunction {
+
+ private final SortInfo sortInfo;
+
+ private List<Row> allRows;
+
+ private TopNRowComparator<Row> topNRowComparator;
+
+ public OrderByTimSort(SortInfo sortInfo) {
+ this.sortInfo = sortInfo;
+ }
+
+ @Override
+ public void open(FunctionContext context) {
+ this.topNRowComparator = new TopNRowComparator<>(sortInfo);
+ this.allRows = new ArrayList<>();
+ }
+
+ @Override
+ public void process(Row row) {
+ if (sortInfo.fetch == 0) {
+ return;
+ }
+ allRows.add(row);
+ }
+
+ @Override
+ public Iterable<Row> finish() {
+ List<Row> sortedRows = new ArrayList<>(allRows);
+ sortedRows.sort(topNRowComparator);
+ allRows.clear();
+ return sortedRows;
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java
new file mode 100644
index 00000000..223ebecb
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.runtime.function.table.order;
+
+import java.util.List;
+import org.apache.geaflow.common.binary.BinaryString;
+import org.apache.geaflow.dsl.common.data.Row;
+
+public class MultiFieldRadixSort {
+
+ private static final ThreadLocal<Integer> dataSize = new ThreadLocal<>();
+ private static final ThreadLocal<int[]> intValues = new ThreadLocal<>();
+ private static final ThreadLocal<int[]> sortedIntValues = new
ThreadLocal<>();
+ private static final ThreadLocal<int[]> charCodes = new ThreadLocal<>();
+ private static final ThreadLocal<byte[]> digits = new ThreadLocal<>();
+ private static final ThreadLocal<String[]> stringValues = new
ThreadLocal<>();
+ private static final ThreadLocal<String[]> sortedStringValues = new
ThreadLocal<>();
+ private static final ThreadLocal<Row[]> srcData = new ThreadLocal<>();
+ private static final ThreadLocal<Row[]> dstData = new ThreadLocal<>();
+
+ /**
+ * Multi-field radix sort.
+ */
+ public static void multiFieldRadixSort(List<Row> data, SortInfo sortInfo) {
+ if (data == null || data.size() <= 1) {
+ return;
+ }
+ int size = data.size();
+
+ try {
+ dataSize.set(size);
+ intValues.set(new int[size]);
+ sortedIntValues.set(new int[size]);
+ charCodes.set(new int[size]);
+ digits.set(new byte[size]);
+ stringValues.set(new String[size]);
+ sortedStringValues.set(new String[size]);
+ srcData.set(data.toArray(new Row[0]));
+ dstData.set(new Row[size]);
+
+ // Sort by field with the lowest priority.
+ List<OrderByField> fields = sortInfo.orderByFields;
+
+ for (int i = fields.size() - 1; i >= 0; i--) {
+ OrderByField field = fields.get(i);
+ if (field.expression.getOutputType().getTypeClass() ==
Integer.class) {
+ radixSortByIntField(field);
+ } else {
+ radixSortByStringField(field);
+ }
+ }
+
+ Row[] finalData = srcData.get();
+ for (int j = 0; j < size; j++) {
+ data.set(j, finalData[j]);
+ }
+ } finally {
+ dataSize.remove();
+ intValues.remove();
+ sortedIntValues.remove();
+ charCodes.remove();
+ digits.remove();
+ stringValues.remove();
+ sortedStringValues.remove();
+ srcData.remove();
+ dstData.remove();
+ }
+ }
+
+ /**
+ * Radix sort by integer field.
+ */
+ private static void radixSortByIntField(OrderByField field) {
+ int size = dataSize.get();
+ int[] intVals = intValues.get();
+ byte[] digs = digits.get();
+ Row[] src = srcData.get();
+
+ // Determine the number of digits.
+ int max = Integer.MIN_VALUE;
+ int min = Integer.MAX_VALUE;
+ boolean hasNull = false;
+
+ for (int i = 0; i < size; i++) {
+ Integer value = (Integer) field.expression.evaluate(src[i]);
+ if (value != null) {
+ intVals[i] = value;
+ max = value > max ? value : max;
+ min = value < min ? value : min;
+ } else {
+ intVals[i] = Integer.MIN_VALUE;
+ hasNull = true;
+ }
+ }
+ if (hasNull) {
+ min--;
+ }
+
+ // Handling negative numbers: Add the offset to all numbers to make
them positive.
+ final int offset = min < 0 ? -min : 0;
+ max += offset;
+
+ for (int i = 0; i < size; i++) {
+ if (intVals[i] == Integer.MIN_VALUE) {
+ intVals[i] = min;
+ }
+ intVals[i] += offset;
+ }
+
+ // Bitwise sorting.
+ for (int exp = 1; max / exp > 0; exp *= 10) {
+ for (int j = 0; j < size; j++) {
+ digs[j] = (byte) (intVals[j] / exp % 10);
+ }
+ countingSortByDigit(field.order.value > 0);
+ }
+ }
+
+ /**
+ * Radix sorting by string field.
+ */
+ private static void radixSortByStringField(OrderByField field) {
+ int size = dataSize.get();
+ String[] strVals = stringValues.get();
+ Row[] src = srcData.get();
+
+ // Precompute all strings to avoid repeated evaluation and toString.
+ int maxLength = 0;
+
+ for (int i = 0; i < size; i++) {
+ BinaryString binaryString = (BinaryString)
field.expression.evaluate(src[i]);
+ strVals[i] = binaryString != null ? binaryString.toString() : "";
+ maxLength = Math.max(maxLength, strVals[i].length());
+ }
+
+ // Sort from the last digit of the string.
+ for (int pos = maxLength - 1; pos >= 0; pos--) {
+ countingSortByChar(field.order.value > 0, pos);
+ }
+ }
+
+ /**
+ * Sort by the specified number of digits (integer).
+ */
+ private static void countingSortByDigit(boolean ascending) {
+ int size = dataSize.get();
+ byte[] digs = digits.get();
+ int[] intVals = intValues.get();
+ int[] sortedIntVals = sortedIntValues.get();
+ Row[] src = srcData.get();
+ Row[] dst = dstData.get();
+
+ int[] count = new int[10];
+
+ // Count the number of times each number appears.
+ for (int i = 0; i < size; i++) {
+ count[digs[i]]++;
+ }
+
+ // Calculate cumulative count.
+ if (ascending) {
+ for (int i = 1; i < 10; i++) {
+ count[i] += count[i - 1];
+ }
+ } else {
+ for (int i = 8; i >= 0; i--) {
+ count[i] += count[i + 1];
+ }
+ }
+
+ // Build the output array from back to front (to ensure stability).
+ for (int i = size - 1; i >= 0; i--) {
+ int index = --count[digs[i]];
+ dst[index] = src[i];
+ sortedIntVals[index] = intVals[i];
+ }
+
+ int[] intTmp = intVals;
+ intValues.set(sortedIntVals);
+ sortedIntValues.set(intTmp);
+
+ Row[] rowTmp = src;
+ srcData.set(dst);
+ dstData.set(rowTmp);
+ }
+
+ /**
+ * Sort by the specified number of digits (string).
+ */
+ private static void countingSortByChar(boolean ascending, int pos) {
+ int size = dataSize.get();
+ String[] strVals = stringValues.get();
+ String[] sortedStrVals = sortedStringValues.get();
+ int[] charCds = charCodes.get();
+ Row[] src = srcData.get();
+ Row[] dst = dstData.get();
+
+ // Precompute all strings and character codes to avoid repeated
evaluate and toString.
+ int minChar = Integer.MAX_VALUE;
+ int maxChar = Integer.MIN_VALUE;
+
+ for (int i = 0; i < size; i++) {
+ String value = strVals[i];
+ if (pos < value.length()) {
+ int charCode = value.codePointAt(pos);
+ charCds[i] = charCode;
+ minChar = Math.min(minChar, charCode);
+ maxChar = Math.max(maxChar, charCode);
+ }
+ }
+ int range = maxChar - minChar + 2;
+ int[] count = new int[range];
+
+ for (int i = 0; i < size; i++) {
+ if (pos < strVals[i].length()) {
+ charCds[i] -= (minChar - 1);
+ } else {
+ charCds[i] = 0; // null character
+ }
+ count[charCds[i]]++;
+ }
+
+ if (ascending) {
+ for (int i = 1; i < range; i++) {
+ count[i] += count[i - 1];
+ }
+ } else {
+ for (int i = range - 2; i >= 0; i--) {
+ count[i] += count[i + 1];
+ }
+ }
+
+ for (int i = size - 1; i >= 0; i--) {
+ int index = --count[charCds[i]];
+ dst[index] = src[i];
+ sortedStrVals[index] = strVals[i];
+ }
+
+ String[] stringTmp = strVals;
+ stringValues.set(sortedStrVals);
+ sortedStringValues.set(stringTmp);
+
+ Row[] rowTmp = src;
+ srcData.set(dst);
+ dstData.set(rowTmp);
+ }
+}
\ No newline at end of file
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java
index b7bbf238..c90c9e21 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java
@@ -23,6 +23,9 @@ import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import org.apache.geaflow.common.binary.BinaryString;
+import org.apache.geaflow.common.type.IType;
+import org.apache.geaflow.common.type.primitive.BinaryStringType;
public class SortInfo implements Serializable {
@@ -36,4 +39,21 @@ public class SortInfo implements Serializable {
sortInfo.fetch = this.fetch;
return sortInfo;
}
+
+ public boolean isRadixSortable() {
+ for (int i = 0; i < this.orderByFields.size(); i++) {
+ OrderByField field = this.orderByFields.get(i);
+ IType<?> orderType = field.expression.getOutputType();
+ if (orderType.getTypeClass() != Integer.class &&
orderType.getTypeClass() != BinaryString.class) {
+ return false;
+ } else if (orderType.getTypeClass() == BinaryString.class) {
+ int precision = ((BinaryStringType) orderType).getPrecision();
+ // MongoDB ObjectId: 24-character hexadecimal
+ if (precision > 24 || precision < 0) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/plan/PhysicSortRelNode.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/plan/PhysicSortRelNode.java
index 08c2179a..7823ecee 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/plan/PhysicSortRelNode.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/plan/PhysicSortRelNode.java
@@ -38,7 +38,9 @@ import org.apache.geaflow.dsl.runtime.RuntimeTable;
import org.apache.geaflow.dsl.runtime.expression.ExpressionTranslator;
import org.apache.geaflow.dsl.runtime.expression.field.FieldExpression;
import org.apache.geaflow.dsl.runtime.function.table.OrderByFunction;
-import org.apache.geaflow.dsl.runtime.function.table.OrderByFunctionImpl;
+import org.apache.geaflow.dsl.runtime.function.table.OrderByHeapSort;
+import org.apache.geaflow.dsl.runtime.function.table.OrderByRadixSort;
+import org.apache.geaflow.dsl.runtime.function.table.OrderByTimSort;
import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField;
import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField.ORDER;
import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo;
@@ -69,8 +71,15 @@ public class PhysicSortRelNode extends Sort implements
PhysicRelNode<RuntimeTabl
public RuntimeTable translate(QueryContext context) {
SortInfo sortInfo = buildSortInfo();
RDataView dataView = ((PhysicRelNode<?>)
getInput()).translate(context);
-
- OrderByFunction orderByFunction = new OrderByFunctionImpl(sortInfo);
+
+ OrderByFunction orderByFunction;
+ if (sortInfo.fetch > 0) {
+ orderByFunction = new OrderByHeapSort(sortInfo);
+ } else if (sortInfo.isRadixSortable()) {
+ orderByFunction = new OrderByRadixSort(sortInfo);
+ } else {
+ orderByFunction = new OrderByTimSort(sortInfo);
+ }
if (dataView.getType() == ViewType.TABLE) {
return ((RuntimeTable) dataView).orderBy(orderByFunction);
} else {
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderMemoryBenchmark.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderMemoryBenchmark.java
new file mode 100644
index 00000000..94a8a6fd
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderMemoryBenchmark.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.runtime.benchmark;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import org.apache.geaflow.common.type.Types;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.runtime.expression.Expression;
+import org.apache.geaflow.dsl.runtime.expression.field.FieldExpression;
+import org.apache.geaflow.dsl.runtime.function.table.OrderByFunction;
+import org.apache.geaflow.dsl.runtime.function.table.OrderByHeapSort;
+import org.apache.geaflow.dsl.runtime.function.table.OrderByRadixSort;
+import org.apache.geaflow.dsl.runtime.function.table.OrderByTimSort;
+import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField;
+import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField.ORDER;
+import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo;
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.profile.GCProfiler;
+import org.openjdk.jmh.results.format.ResultFormatType;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+@BenchmarkMode(Mode.SingleShotTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+@Measurement(iterations = 10)
+@Fork(1)
+public class OrderMemoryBenchmark {
+
+ @Param({"10000", "100000", "1000000"})
+ private int dataSize;
+
+ @Param({"100", "1000", "10000"})
+ private int topN;
+
+ private OrderByFunction orderByFunction;
+ private List<Row> testData;
+ private SortInfo sortInfo = new SortInfo();
+
+ @Setup(Level.Trial)
+ public void setup() {
+ // Create sort expression
+ Expression expression = new FieldExpression(0, Types.INTEGER);
+
+ OrderByField orderByField = new OrderByField();
+ orderByField.expression = expression;
+ orderByField.order = ORDER.ASC;
+
+ List<OrderByField> orderByFields = new ArrayList<>(1);
+ orderByFields.add(orderByField);
+
+ sortInfo.orderByFields = orderByFields;
+ sortInfo.fetch = topN;
+
+ // Generate test data
+ testData = generateTestData();
+ }
+
+ private List<Row> generateTestData() {
+ List<Row> data = new ArrayList<>(dataSize);
+ Random random = new Random(42);
+
+ for (int i = 0; i < dataSize; i++) {
+ Object[] values = {random.nextInt(dataSize * 10)};
+ data.add(ObjectRow.create(values));
+ }
+
+ return data;
+ }
+
+ @Benchmark
+ public Iterable<Row> benchmarkHeapSortMemory() {
+ // Create a copy of the input data to avoid state pollution
+ List<Row> inputData = new ArrayList<>(testData);
+
+ orderByFunction = new OrderByHeapSort(sortInfo);
+ orderByFunction.open(null);
+
+ for (int i = 0; i < dataSize; i++) {
+ orderByFunction.process(inputData.get(i));
+ }
+
+ // Perform Top-N sorting
+ return orderByFunction.finish();
+ }
+
+ @Benchmark
+ public Iterable<Row> benchmarkRadixSortMemory() {
+ List<Row> inputData = new ArrayList<>(testData);
+
+ orderByFunction = new OrderByRadixSort(sortInfo);
+ orderByFunction.open(null);
+
+ for (int i = 0; i < dataSize; i++) {
+ orderByFunction.process(inputData.get(i));
+ }
+
+ return orderByFunction.finish();
+ }
+
+ @Benchmark
+ public Iterable<Row> benchmarkTimSortMemory() {
+ List<Row> inputData = new ArrayList<>(testData);
+
+ orderByFunction = new OrderByTimSort(sortInfo);
+ orderByFunction.open(null);
+
+ for (int i = 0; i < dataSize; i++) {
+ orderByFunction.process(inputData.get(i));
+ }
+
+ return orderByFunction.finish();
+ }
+
+ public static void main(String[] args) throws RunnerException {
+ // Run a verification first
+ OrderMemoryBenchmark benchmark = new OrderMemoryBenchmark();
+ benchmark.dataSize = 10;
+ benchmark.topN = 10;
+ benchmark.setup();
+ Iterable<Row> heapResults = benchmark.benchmarkHeapSortMemory();
+ System.out.println("===HEAP_SORT===");
+ for (Row result: heapResults) {
+ System.out.print(result);
+ }
+ System.out.println();
+ System.out.println("===RADIX_SORT===");
+ Iterable<Row> radixResults = benchmark.benchmarkRadixSortMemory();
+ for (Row result: radixResults) {
+ System.out.print(result);
+ }
+ System.out.println();
+ System.out.println("===TIM_SORT===");
+ Iterable<Row> timResults = benchmark.benchmarkTimSortMemory();
+ for (Row result: timResults) {
+ System.out.print(result);
+ }
+ System.out.println();
+
+ String timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new
Date());
+ String resultFile = "target/benchmark-results/memory-" + timestamp +
".json";
+
+ Options opt = new OptionsBuilder()
+ .include(OrderMemoryBenchmark.class.getSimpleName())
+ .addProfiler(GCProfiler.class)
+ .jvmArgs("-Xms2g", "-Xmx4g")
+ .result(resultFile)
+ .resultFormat(ResultFormatType.JSON)
+ .build();
+
+ new Runner(opt).run();
+ }
+}
\ No newline at end of file
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderTimeBenchmark.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderTimeBenchmark.java
new file mode 100644
index 00000000..96bdf77e
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderTimeBenchmark.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.runtime.benchmark;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import org.apache.geaflow.common.binary.BinaryString;
+import org.apache.geaflow.common.type.IType;
+import org.apache.geaflow.common.type.Types;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.runtime.expression.Expression;
+import org.apache.geaflow.dsl.runtime.expression.field.FieldExpression;
+import org.apache.geaflow.dsl.runtime.function.table.OrderByFunction;
+import org.apache.geaflow.dsl.runtime.function.table.OrderByHeapSort;
+import org.apache.geaflow.dsl.runtime.function.table.OrderByRadixSort;
+import org.apache.geaflow.dsl.runtime.function.table.OrderByTimSort;
+import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField;
+import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField.ORDER;
+import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo;
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.results.format.ResultFormatType;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS)
+@Fork(2)
+public class OrderTimeBenchmark {
+ private static final String CHARS =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
+
+ @Param({"1000"})
+ private int dataSize;
+
+ @Param({"1000"})
+ private int topN;
+
+ @Param({"RANDOM", "SORTED", "REVERSE_SORTED", "PARTIAL_SORTED",
"DUPLICATED"})
+ private String dataPattern;
+
+ @Param({"STRING"})
+ private String dataType;
+
+ private OrderByFunction orderByFunction;
+ private List<Row> testData;
+ private SortInfo sortInfo = new SortInfo();
+
+ @Setup(Level.Trial)
+ public void setupBenchmark() {
+ // Create sort expression
+ setupOrderByExpressions();
+
+ // Generate test data
+ testData = generateTestData();
+ }
+
+ private void setupOrderByExpressions() {
+ IType<?> fieldType;
+ switch (dataType) {
+ case "INTEGER":
+ fieldType = Types.INTEGER;
+ break;
+ case "DOUBLE":
+ fieldType = Types.DOUBLE;
+ break;
+ case "STRING":
+ fieldType = Types.BINARY_STRING;
+ break;
+ default:
+ fieldType = Types.INTEGER;
+ }
+
+ List<OrderByField> orderByFields = new ArrayList<>(2);
+
+ // Primary sort field
+ Expression expression1 = new FieldExpression(0, fieldType);
+ OrderByField orderByField1 = new OrderByField();
+ orderByField1.expression = expression1;
+ orderByField1.order = ORDER.ASC;
+ orderByFields.add(orderByField1);
+
+ // Add a secondary sort field (for testing multi-field sorting
performance)
+ Expression expression2 = new FieldExpression(1, Types.INTEGER);
+ OrderByField orderByField2 = new OrderByField();
+ orderByField2.expression = expression2;
+ orderByField2.order = ORDER.ASC;
+ orderByFields.add(orderByField2);
+
+ sortInfo.orderByFields = orderByFields;
+ sortInfo.fetch = topN;
+ }
+
+ private List<Row> generateTestData() {
+ List<Row> data = new ArrayList<>(dataSize);
+ Random random = new Random(42); // Fixed seeds ensure reproducibility
+
+ for (int i = 0; i < dataSize; i++) {
+ Object[] values = new Object[2];
+
+ // Generate the value of the primary sort field
+ switch (dataType) {
+ case "INTEGER":
+ values[0] = generateIntegerValue(i, random);
+ break;
+ case "DOUBLE":
+ values[0] = generateDoubleValue(i, random);
+ break;
+ case "STRING":
+ values[0] = BinaryString.fromString(generateStringValue(i,
random));
+ break;
+ default:
+ return data;
+ }
+
+ // Generate the value of the secondary sort field
+ values[1] = random.nextInt(100);
+
+ data.add(ObjectRow.create(values));
+ }
+
+ return data;
+ }
+
+ private Integer generateIntegerValue(int index, Random random) {
+ switch (dataPattern) {
+ case "RANDOM":
+ return random.nextInt(dataSize * 10);
+ case "SORTED":
+ return index;
+ case "REVERSE_SORTED":
+ return dataSize - index;
+ case "PARTIAL_SORTED":
+ // 70% ordered, 30% random
+ return index < dataSize * 0.7 ? index :
random.nextInt(dataSize);
+ case "DUPLICATED":
+ // Generate a large number of repeated values
+ return random.nextInt(dataSize / 10);
+ default:
+ return random.nextInt(dataSize);
+ }
+ }
+
+ private Double generateDoubleValue(int index, Random random) {
+ switch (dataPattern) {
+ case "RANDOM":
+ return random.nextDouble() * dataSize * 10;
+ case "SORTED":
+ return (double) index + random.nextDouble();
+ case "REVERSE_SORTED":
+ return (double) (dataSize - index) + random.nextDouble();
+ case "PARTIAL_SORTED":
+ return index < dataSize * 0.7
+ ? (double) index + random.nextDouble()
+ : random.nextDouble() * dataSize;
+ case "DUPLICATED":
+ return (double) (random.nextInt(dataSize / 10)) +
random.nextDouble();
+ default:
+ return random.nextDouble() * dataSize;
+ }
+ }
+
+ private String generateStringValue(int index, Random random) {
+ String[] prefixes = {"A", "B", "C", "D", "E", "F", "G", "H", "I", "J"};
+
+ switch (dataPattern) {
+ case "RANDOM":
+ return generateRandomString(1, 101, random);
+ case "SORTED":
+ return String.format("R%0100d", index);
+ case "REVERSE_SORTED":
+ return String.format("R%0100d", dataSize - index);
+ case "PARTIAL_SORTED":
+ return index < dataSize * 0.7
+ ? String.format("R%0100d", index)
+ : generateRandomString(1, 101, random);
+ case "DUPLICATED":
+ return prefixes[random.nextInt(3)]
+ + String.format("%0100d", random.nextInt(dataSize / 10));
+ default:
+ return String.format("R%0100d", random.nextInt(dataSize));
+ }
+ }
+
+ private String generateRandomString(int length, Random random) {
+ StringBuilder sb = new StringBuilder(length);
+ for (int i = 0; i < length; i++) {
+ sb.append(CHARS.charAt(random.nextInt(CHARS.length())));
+ }
+ return sb.toString();
+ }
+
+ private String generateRandomString(int minLength, int maxLength, Random
random) {
+ int length = minLength + random.nextInt(maxLength - minLength + 1);
+ return generateRandomString(length, random);
+ }
+
+
+ @Benchmark
+ public Iterable<Row> benchmarkHeapSort() {
+ // Create a copy of the input data to avoid state pollution
+ List<Row> inputData = new ArrayList<>(testData);
+
+ orderByFunction = new OrderByHeapSort(sortInfo);
+ orderByFunction.open(null);
+
+ for (int i = 0; i < dataSize; i++) {
+ orderByFunction.process(inputData.get(i));
+ }
+
+ // Perform Top-N sorting
+ return orderByFunction.finish();
+ }
+
+ @Benchmark
+ public Iterable<Row> benchmarkRadixSort() {
+ List<Row> inputData = new ArrayList<>(testData);
+
+ orderByFunction = new OrderByRadixSort(sortInfo);
+ orderByFunction.open(null);
+
+ for (int i = 0; i < dataSize; i++) {
+ orderByFunction.process(inputData.get(i));
+ }
+
+ return orderByFunction.finish();
+ }
+
+ @Benchmark
+ public Iterable<Row> benchmarkTimSort() {
+ List<Row> inputData = new ArrayList<>(testData);
+
+ orderByFunction = new OrderByTimSort(sortInfo);
+ orderByFunction.open(null);
+
+ for (int i = 0; i < dataSize; i++) {
+ orderByFunction.process(inputData.get(i));
+ }
+
+ return orderByFunction.finish();
+ }
+
+ public static void main(String[] args) throws RunnerException {
+ // Run a verification first
+ OrderTimeBenchmark benchmark = new OrderTimeBenchmark();
+ benchmark.dataSize = 10;
+ benchmark.topN = 10;
+ benchmark.dataPattern = "RANDOM";
+ benchmark.dataType = "INTEGER";
+ benchmark.setupBenchmark();
+ Iterable<Row> heapResults = benchmark.benchmarkHeapSort();
+ System.out.println("===HEAP_SORT===");
+ for (Row result: heapResults) {
+ System.out.print(result);
+ }
+ System.out.println();
+ System.out.println("===RADIX_SORT===");
+ Iterable<Row> radixResults = benchmark.benchmarkRadixSort();
+ for (Row result: radixResults) {
+ System.out.print(result);
+ }
+ System.out.println();
+ System.out.println("===TIM_SORT===");
+ Iterable<Row> timResults = benchmark.benchmarkTimSort();
+ for (Row result: timResults) {
+ System.out.print(result);
+ }
+ System.out.println();
+
+ String timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new
Date());
+ String resultFile = "target/benchmark-results/time-" + timestamp +
".json";
+
+ Options opt = new OptionsBuilder()
+ .include(OrderTimeBenchmark.class.getSimpleName())
+ .jvmArgs("-Xms4g", "-Xmx8g", "-XX:+UseG1GC")
+ .result(resultFile)
+ .resultFormat(ResultFormatType.JSON)
+ .build();
+
+ new Runner(opt).run();
+ }
+}
\ No newline at end of file
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/plan/StepPlanTest.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/plan/StepPlanTest.java
index eb08dd52..82d72af5 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/plan/StepPlanTest.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/plan/StepPlanTest.java
@@ -175,7 +175,7 @@ public class StepPlanTest {
}
private GraphSchema createGraph() {
- TableField idField = new TableField("id", Types.of("Long"), false);
+ TableField idField = new TableField("id", Types.of("Long", -1), false);
VertexTable vTable = new VertexTable("default", "testV",
Collections.singletonList(idField), "id");
GeaFlowGraph graph = new GeaFlowGraph("default", "test",
Lists.newArrayList(vTable),
new ArrayList<>(), new HashMap<>(), new HashMap<>(), false, false);
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/MultiFieldRadixSortTest.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/MultiFieldRadixSortTest.java
new file mode 100644
index 00000000..1b2b4502
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/MultiFieldRadixSortTest.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.runtime.query;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.geaflow.common.binary.BinaryString;
+import org.apache.geaflow.common.type.primitive.BinaryStringType;
+import org.apache.geaflow.common.type.primitive.IntegerType;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.runtime.expression.field.FieldExpression;
+import org.apache.geaflow.dsl.runtime.function.table.order.MultiFieldRadixSort;
+import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField;
+import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField.ORDER;
+import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo;
+import org.testng.annotations.Test;
+
+public class MultiFieldRadixSortTest {
+
+ @Test
+ public void testSortEmptyList() {
+ List<Row> data = new ArrayList<>();
+
+ SortInfo sortInfo = new SortInfo();
+ sortInfo.orderByFields = new ArrayList<>(1);
+
+ OrderByField intField = new OrderByField();
+ intField.expression = new FieldExpression(0, IntegerType.INSTANCE);
+ intField.order = ORDER.ASC;
+ sortInfo.orderByFields.add(intField);
+
+ MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo);
+
+ assertEquals(data.size(), 0);
+ }
+
+ @Test
+ public void testSortSingleElement() {
+ List<Row> data = new ArrayList<>(1);
+ data.add(ObjectRow.create(new Object[]{1,
BinaryString.fromString("test")}));
+
+ SortInfo sortInfo = new SortInfo();
+ sortInfo.orderByFields = new ArrayList<>(1);
+
+ OrderByField intField = new OrderByField();
+ intField.expression = new FieldExpression(0, IntegerType.INSTANCE);
+ intField.order = ORDER.ASC;
+ sortInfo.orderByFields.add(intField);
+
+ MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo);
+
+ assertEquals(data.size(), 1);
+ assertEquals(data.get(0).getField(0, IntegerType.INSTANCE),
Integer.valueOf(1));
+ }
+
+ @Test
+ public void testSortByIntegerFieldAscending() {
+ List<Row> data = new ArrayList<>(3);
+ data.add(ObjectRow.create(new Object[]{3,
BinaryString.fromString("c")}));
+ data.add(ObjectRow.create(new Object[]{1,
BinaryString.fromString("a")}));
+ data.add(ObjectRow.create(new Object[]{2,
BinaryString.fromString("b")}));
+
+ SortInfo sortInfo = new SortInfo();
+ sortInfo.orderByFields = new ArrayList<>(1);
+ OrderByField intField = new OrderByField();
+ intField.expression = new FieldExpression(0, IntegerType.INSTANCE);
+ intField.order = ORDER.ASC;
+ sortInfo.orderByFields.add(intField);
+
+ MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo);
+
+ assertEquals(data.size(), 3);
+ assertEquals(data.get(0).getField(0, IntegerType.INSTANCE),
Integer.valueOf(1));
+ assertEquals(data.get(1).getField(0, IntegerType.INSTANCE),
Integer.valueOf(2));
+ assertEquals(data.get(2).getField(0, IntegerType.INSTANCE),
Integer.valueOf(3));
+ }
+
+ @Test
+ public void testSortByIntegerFieldDescending() {
+ List<Row> data = new ArrayList<>(3);
+ data.add(ObjectRow.create(new Object[]{1,
BinaryString.fromString("a")}));
+ data.add(ObjectRow.create(new Object[]{3,
BinaryString.fromString("c")}));
+ data.add(ObjectRow.create(new Object[]{2,
BinaryString.fromString("b")}));
+
+ SortInfo sortInfo = new SortInfo();
+ sortInfo.orderByFields = new ArrayList<>(1);
+ OrderByField intField = new OrderByField();
+ intField.expression = new FieldExpression(0, IntegerType.INSTANCE);
+ intField.order = ORDER.DESC;
+ sortInfo.orderByFields.add(intField);
+
+ MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo);
+
+ assertEquals(data.size(), 3);
+ assertEquals(data.get(0).getField(0, IntegerType.INSTANCE),
Integer.valueOf(3));
+ assertEquals(data.get(1).getField(0, IntegerType.INSTANCE),
Integer.valueOf(2));
+ assertEquals(data.get(2).getField(0, IntegerType.INSTANCE),
Integer.valueOf(1));
+ }
+
+ @Test
+ public void testSortByStringFieldAscending() {
+ List<Row> data = new ArrayList<>(3);
+ data.add(ObjectRow.create(new Object[]{3,
BinaryString.fromString("zebra")}));
+ data.add(ObjectRow.create(new Object[]{1,
BinaryString.fromString("apple")}));
+ data.add(ObjectRow.create(new Object[]{2,
BinaryString.fromString("banana")}));
+
+ SortInfo sortInfo = new SortInfo();
+ sortInfo.orderByFields = new ArrayList<>(1);
+ OrderByField stringField = new OrderByField();
+ stringField.expression = new FieldExpression(1,
BinaryStringType.INSTANCE);
+ stringField.order = ORDER.ASC;
+ sortInfo.orderByFields.add(stringField);
+
+ MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo);
+
+ assertEquals(data.size(), 3);
+ assertEquals(((BinaryString)data.get(0).getField(1,
BinaryStringType.INSTANCE)).toString(), "apple");
+ assertEquals(((BinaryString)data.get(1).getField(1,
BinaryStringType.INSTANCE)).toString(), "banana");
+ assertEquals(((BinaryString)data.get(2).getField(1,
BinaryStringType.INSTANCE)).toString(), "zebra");
+ }
+
+ @Test
+ public void testSortByStringFieldDescending() {
+ List<Row> data = new ArrayList<>(3);
+ data.add(ObjectRow.create(new Object[]{1,
BinaryString.fromString("apple")}));
+ data.add(ObjectRow.create(new Object[]{3,
BinaryString.fromString("zebra")}));
+ data.add(ObjectRow.create(new Object[]{2,
BinaryString.fromString("banana")}));
+
+ SortInfo sortInfo = new SortInfo();
+ sortInfo.orderByFields = new ArrayList<>(1);
+ OrderByField stringField = new OrderByField();
+ stringField.expression = new FieldExpression(1,
BinaryStringType.INSTANCE);
+ stringField.order = ORDER.DESC;
+ sortInfo.orderByFields.add(stringField);
+
+ MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo);
+
+ assertEquals(data.size(), 3);
+ assertEquals(((BinaryString)data.get(0).getField(1,
BinaryStringType.INSTANCE)).toString(), "zebra");
+ assertEquals(((BinaryString)data.get(1).getField(1,
BinaryStringType.INSTANCE)).toString(), "banana");
+ assertEquals(((BinaryString)data.get(2).getField(1,
BinaryStringType.INSTANCE)).toString(), "apple");
+ }
+
+ @Test
+ public void testMultiFieldSort() {
+ List<Row> data = new ArrayList<>(4);
+ data.add(ObjectRow.create(new Object[]{1,
BinaryString.fromString("b")}));
+ data.add(ObjectRow.create(new Object[]{2,
BinaryString.fromString("a")}));
+ data.add(ObjectRow.create(new Object[]{1,
BinaryString.fromString("a")}));
+ data.add(ObjectRow.create(new Object[]{2,
BinaryString.fromString("b")}));
+
+ SortInfo sortInfo = new SortInfo();
+ sortInfo.orderByFields = new ArrayList<>(2);
+
+ // First sort by integer field (ascending)
+ OrderByField intField = new OrderByField();
+ intField.expression = new FieldExpression(0, IntegerType.INSTANCE);
+ intField.order = ORDER.ASC;
+ sortInfo.orderByFields.add(intField);
+
+ // Then sort by string field (ascending)
+ OrderByField stringField = new OrderByField();
+ stringField.expression = new FieldExpression(1,
BinaryStringType.INSTANCE);
+ stringField.order = ORDER.ASC;
+ sortInfo.orderByFields.add(stringField);
+
+ MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo);
+
+ assertEquals(data.size(), 4);
+ // Expected order: (1, "a"), (1, "b"), (2, "a"), (2, "b")
+ assertEquals(data.get(0).getField(0, IntegerType.INSTANCE),
Integer.valueOf(1));
+ assertEquals(((BinaryString)data.get(0).getField(1,
BinaryStringType.INSTANCE)).toString(), "a");
+
+ assertEquals(data.get(1).getField(0, IntegerType.INSTANCE),
Integer.valueOf(1));
+ assertEquals(((BinaryString)data.get(1).getField(1,
BinaryStringType.INSTANCE)).toString(), "b");
+
+ assertEquals(data.get(2).getField(0, IntegerType.INSTANCE),
Integer.valueOf(2));
+ assertEquals(((BinaryString)data.get(2).getField(1,
BinaryStringType.INSTANCE)).toString(), "a");
+
+ assertEquals(data.get(3).getField(0, IntegerType.INSTANCE),
Integer.valueOf(2));
+ assertEquals(((BinaryString)data.get(3).getField(1,
BinaryStringType.INSTANCE)).toString(), "b");
+ }
+
+ @Test
+ public void testSortWithNullValues() {
+ List<Row> data = new ArrayList<>(3);
+ data.add(ObjectRow.create(new Object[]{null,
BinaryString.fromString("b")}));
+ data.add(ObjectRow.create(new Object[]{2,
BinaryString.fromString("a")}));
+ data.add(ObjectRow.create(new Object[]{1,
BinaryString.fromString("c")}));
+
+ SortInfo sortInfo = new SortInfo();
+ sortInfo.orderByFields = new ArrayList<>(1);
+ OrderByField intField = new OrderByField();
+ intField.expression = new FieldExpression(0, IntegerType.INSTANCE);
+ intField.order = ORDER.ASC;
+ sortInfo.orderByFields.add(intField);
+
+ MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo);
+
+ assertEquals(data.size(), 3);
+ // Null values should appear first in ascending order
+ assertEquals(data.get(0).getField(0, IntegerType.INSTANCE), null);
+ assertEquals(data.get(1).getField(0, IntegerType.INSTANCE),
Integer.valueOf(1));
+ assertEquals(data.get(2).getField(0, IntegerType.INSTANCE),
Integer.valueOf(2));
+ }
+
+ @Test
+ public void testSortWithNegativeNumbers() {
+ List<Row> data = new ArrayList<>(4);
+ data.add(ObjectRow.create(new Object[]{-1,
BinaryString.fromString("a")}));
+ data.add(ObjectRow.create(new Object[]{3,
BinaryString.fromString("b")}));
+ data.add(ObjectRow.create(new Object[]{-5,
BinaryString.fromString("c")}));
+ data.add(ObjectRow.create(new Object[]{0,
BinaryString.fromString("d")}));
+
+ SortInfo sortInfo = new SortInfo();
+ sortInfo.orderByFields = new ArrayList<>(1);
+ OrderByField intField = new OrderByField();
+ intField.expression = new FieldExpression(0, IntegerType.INSTANCE);
+ intField.order = ORDER.ASC;
+ sortInfo.orderByFields.add(intField);
+
+ MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo);
+
+ assertEquals(data.size(), 4);
+ assertEquals(data.get(0).getField(0, IntegerType.INSTANCE),
Integer.valueOf(-5));
+ assertEquals(data.get(1).getField(0, IntegerType.INSTANCE),
Integer.valueOf(-1));
+ assertEquals(data.get(2).getField(0, IntegerType.INSTANCE),
Integer.valueOf(0));
+ assertEquals(data.get(3).getField(0, IntegerType.INSTANCE),
Integer.valueOf(3));
+ }
+
+ @Test
+ public void testSortWithEmptyStrings() {
+ List<Row> data = new ArrayList<>(3);
+ data.add(ObjectRow.create(new Object[]{1,
BinaryString.fromString("")}));
+ data.add(ObjectRow.create(new Object[]{2,
BinaryString.fromString("hello")}));
+ data.add(ObjectRow.create(new Object[]{3,
BinaryString.fromString("a")}));
+
+ SortInfo sortInfo = new SortInfo();
+ sortInfo.orderByFields = new ArrayList<>(1);
+ OrderByField stringField = new OrderByField();
+ stringField.expression = new FieldExpression(1,
BinaryStringType.INSTANCE);
+ stringField.order = ORDER.ASC;
+ sortInfo.orderByFields.add(stringField);
+
+ MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo);
+
+ assertEquals(data.size(), 3);
+ // Empty string should come first
+ assertEquals(((BinaryString)data.get(0).getField(1,
BinaryStringType.INSTANCE)).toString(), "");
+ assertEquals(((BinaryString)data.get(1).getField(1,
BinaryStringType.INSTANCE)).toString(), "a");
+ assertEquals(((BinaryString)data.get(2).getField(1,
BinaryStringType.INSTANCE)).toString(), "hello");
+ }
+
+ @Test
+ public void testSortStability() {
+ List<Row> data = new ArrayList<>(3);
+ // Create rows with same sort key but different secondary values
+ data.add(ObjectRow.create(new Object[]{1,
BinaryString.fromString("first")}));
+ data.add(ObjectRow.create(new Object[]{1,
BinaryString.fromString("second")}));
+ data.add(ObjectRow.create(new Object[]{1,
BinaryString.fromString("third")}));
+
+ SortInfo sortInfo = new SortInfo();
+ sortInfo.orderByFields = new ArrayList<>(1);
+ OrderByField intField = new OrderByField();
+ intField.expression = new FieldExpression(0, IntegerType.INSTANCE);
+ intField.order = ORDER.ASC;
+ sortInfo.orderByFields.add(intField);
+
+ MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo);
+
+ assertEquals(data.size(), 3);
+ // All should have the same integer value
+ for (Row row : data) {
+ assertEquals(row.getField(0, IntegerType.INSTANCE),
Integer.valueOf(1));
+ }
+ }
+}
\ No newline at end of file
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/SortTest.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/SortTest.java
index a0823f19..f5e95786 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/SortTest.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/SortTest.java
@@ -49,4 +49,13 @@ public class SortTest {
.execute()
.checkSinkResult();
}
+
+ @Test
+ public void testSort_004() throws Exception {
+ QueryTester
+ .build()
+ .withQueryPath("/query/sort_004.sql")
+ .execute()
+ .checkSinkResult();
+ }
}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/sort_004.txt
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/sort_004.txt
new file mode 100644
index 00000000..0767504c
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/sort_004.txt
@@ -0,0 +1,22 @@
+1489576693883,2,2,1,mills
+1489576693894,8,10,1,penys
+1489576693864,7,7,2,jacks
+1489576693844,5,5,3,jacks
+1489576693834,4,4,4,mills
+1489576693884,1,1,10,hello
+1489576693884,1,19,10,hello
+1489576693874,8,8,12,jacks
+1489576693883,1,20,14,mills
+1489576693883,1,21,14,中国
+1489576693854,6,6,15,calls
+1489576693883,2,22,15,中国
+1489576693184,7,11,22,penys
+1489576693822,3,3,30,mills
+1489576693884,9,9,31,penys
+1489576693284,6,12,33,penys
+1489576693864,5,13,44,jacks
+1489576693874,4,14,51,jacks
+1489576693884,3,15,61,penys
+1489576693894,2,16,71,penys
+1489576693184,1,17,81,penys
+1489576693284,5,18,91,penys
\ No newline at end of file
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/sort_004.sql
similarity index 58%
copy from
geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java
copy to
geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/sort_004.sql
index b7bbf238..eae350ab 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/sort_004.sql
@@ -17,23 +17,29 @@
* under the License.
*/
-package org.apache.geaflow.dsl.runtime.function.table.order;
+CREATE TABLE orders(
+ createTime bigint,
+ productId bigint,
+ orderId bigint,
+ units int,
+ user_name VARCHAR(8)
+) WITH (
+ type='file',
+ geaflow.dsl.file.path = 'resource:///data/orders.txt'
+);
-import com.google.common.collect.Lists;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
+CREATE TABLE tbl_result (
+ createTime bigint,
+ productId bigint,
+ orderId bigint,
+ units int,
+ user_name VARCHAR(8)
+) WITH (
+ type='file',
+ geaflow.dsl.file.path='${target}'
+);
-public class SortInfo implements Serializable {
-
- public List<OrderByField> orderByFields = new ArrayList<>();
-
- public int fetch = -1;
-
- public SortInfo copy(List<OrderByField> orderByFields) {
- SortInfo sortInfo = new SortInfo();
- sortInfo.orderByFields = Lists.newArrayList(orderByFields);
- sortInfo.fetch = this.fetch;
- return sortInfo;
- }
-}
+INSERT INTO tbl_result
+SELECT *
+FROM orders o
+ORDER BY units, user_name
diff --git a/geaflow/geaflow-dsl/pom.xml b/geaflow/geaflow-dsl/pom.xml
index b8d58db2..f7bb8be6 100644
--- a/geaflow/geaflow-dsl/pom.xml
+++ b/geaflow/geaflow-dsl/pom.xml
@@ -44,7 +44,7 @@
</modules>
<properties>
- <calcite.version>1.18.0-geaflow_1.0</calcite.version>
+ <calcite.version>1.18.0-geaflow_1.1</calcite.version>
<kafka.version>2.4.1</kafka.version>
<kafka.scala.binary.version>2.11</kafka.scala.binary.version>
<h2.version>2.1.214</h2.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]