Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-16 Thread via GitHub


dawidwys merged PR #22951:
URL: https://github.com/apache/flink/pull/22951


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-15 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1491240847


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySortFunction.java:
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of ARRAY_SORT function. */
+@Internal
+public class ArraySortFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter elementGetter;
+private final SpecializedFunction.ExpressionEvaluator greaterEvaluator;
+
+private transient MethodHandle greaterHandle;
+
+public ArraySortFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.ARRAY_SORT, context);
+final DataType elementDataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType()
+.toInternal();
+elementGetter =
+
ArrayData.createElementGetter(elementDataType.toInternal().getLogicalType());
+greaterEvaluator =
+context.createEvaluator(
+$("element1").isGreater($("element2")),
+DataTypes.BOOLEAN(),
+DataTypes.FIELD("element1", 
elementDataType.notNull().toInternal()),
+DataTypes.FIELD("element2", 
elementDataType.notNull().toInternal()));
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+greaterHandle = greaterEvaluator.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData array) {
+return eval(array, true, true);
+}
+
+public @Nullable ArrayData eval(ArrayData array, Boolean ascendingOrder) {
+return eval(array, ascendingOrder, ascendingOrder);
+}
+
+public @Nullable ArrayData eval(ArrayData array, Boolean ascendingOrder, 
Boolean nullFirst) {
+try {
+if (array == null || ascendingOrder == null || nullFirst == null) {
+return null;
+}
+if (array.size() == 0) {
+return array;
+}
+Object[] elements = new Object[array.size()];
+for (int i = 0; i < array.size(); i++) {
+elements[i] = elementGetter.getElementOrNull(array, i);
+}
+Comparator ascendingComparator = new 
ArraySortComparator(ascendingOrder);
+Arrays.sort(
+elements,
+nullFirst
+? Comparator.nullsFirst(ascendingComparator)
+: Comparator.nullsLast(ascendingComparator));
+return new GenericArrayData(elements);
+} catch (Throwable t) {
+throw new FlinkRuntimeException(t);
+}
+}
+
+private class ArraySortComparator implements Comparator {
+private final boolean isAscending;
+
+public ArraySortComparator(boolean isAscending) {
+this.isAscending = isAscending;
+}
+
+@Override
+public int compare(Object o1, Object o2) {
+if (o1 == null || o2 == null) {
+return 0;
+}

Review Comment:
   I check it. It is not necessary. The IDE suggests `Comparator does not 
return 0 for equal elements` so I 

Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-15 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1491240847


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySortFunction.java:
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of ARRAY_SORT function. */
+@Internal
+public class ArraySortFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter elementGetter;
+private final SpecializedFunction.ExpressionEvaluator greaterEvaluator;
+
+private transient MethodHandle greaterHandle;
+
+public ArraySortFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.ARRAY_SORT, context);
+final DataType elementDataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType()
+.toInternal();
+elementGetter =
+
ArrayData.createElementGetter(elementDataType.toInternal().getLogicalType());
+greaterEvaluator =
+context.createEvaluator(
+$("element1").isGreater($("element2")),
+DataTypes.BOOLEAN(),
+DataTypes.FIELD("element1", 
elementDataType.notNull().toInternal()),
+DataTypes.FIELD("element2", 
elementDataType.notNull().toInternal()));
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+greaterHandle = greaterEvaluator.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData array) {
+return eval(array, true, true);
+}
+
+public @Nullable ArrayData eval(ArrayData array, Boolean ascendingOrder) {
+return eval(array, ascendingOrder, ascendingOrder);
+}
+
+public @Nullable ArrayData eval(ArrayData array, Boolean ascendingOrder, 
Boolean nullFirst) {
+try {
+if (array == null || ascendingOrder == null || nullFirst == null) {
+return null;
+}
+if (array.size() == 0) {
+return array;
+}
+Object[] elements = new Object[array.size()];
+for (int i = 0; i < array.size(); i++) {
+elements[i] = elementGetter.getElementOrNull(array, i);
+}
+Comparator ascendingComparator = new 
ArraySortComparator(ascendingOrder);
+Arrays.sort(
+elements,
+nullFirst
+? Comparator.nullsFirst(ascendingComparator)
+: Comparator.nullsLast(ascendingComparator));
+return new GenericArrayData(elements);
+} catch (Throwable t) {
+throw new FlinkRuntimeException(t);
+}
+}
+
+private class ArraySortComparator implements Comparator {
+private final boolean isAscending;
+
+public ArraySortComparator(boolean isAscending) {
+this.isAscending = isAscending;
+}
+
+@Override
+public int compare(Object o1, Object o2) {
+if (o1 == null || o2 == null) {
+return 0;
+}

Review Comment:
   I check it, it not necessary, the IDE suggest `Comparator does not return 0 
for equal elements` so I add 

Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-15 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1491222383


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +231,28 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(ARRAY_FULLY_COMPARABLE),
+sequence(
+ARRAY_FULLY_COMPARABLE,
+InputTypeStrategies.explicit(
+
DataTypes.BOOLEAN().nullable())),
+sequence(
+ARRAY_FULLY_COMPARABLE,
+InputTypeStrategies.explicit(
+
DataTypes.BOOLEAN().nullable()),
+InputTypeStrategies.explicit(
+
DataTypes.BOOLEAN().nullable()

Review Comment:
   ok, I will update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-15 Thread via GitHub


dawidwys commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1490935727


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   Great description!  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-15 Thread via GitHub


dawidwys commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1490931709


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySortFunction.java:
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of ARRAY_SORT function. */
+@Internal
+public class ArraySortFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter elementGetter;
+private final SpecializedFunction.ExpressionEvaluator greaterEvaluator;
+
+private transient MethodHandle greaterHandle;
+
+public ArraySortFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.ARRAY_SORT, context);
+final DataType elementDataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType()
+.toInternal();
+elementGetter =
+
ArrayData.createElementGetter(elementDataType.toInternal().getLogicalType());
+greaterEvaluator =
+context.createEvaluator(
+$("element1").isGreater($("element2")),
+DataTypes.BOOLEAN(),
+DataTypes.FIELD("element1", 
elementDataType.notNull().toInternal()),
+DataTypes.FIELD("element2", 
elementDataType.notNull().toInternal()));
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+greaterHandle = greaterEvaluator.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData array) {
+return eval(array, true, true);
+}
+
+public @Nullable ArrayData eval(ArrayData array, Boolean ascendingOrder) {
+return eval(array, ascendingOrder, ascendingOrder);
+}
+
+public @Nullable ArrayData eval(ArrayData array, Boolean ascendingOrder, 
Boolean nullFirst) {
+try {
+if (array == null || ascendingOrder == null || nullFirst == null) {
+return null;
+}
+if (array.size() == 0) {
+return array;
+}
+Object[] elements = new Object[array.size()];
+for (int i = 0; i < array.size(); i++) {
+elements[i] = elementGetter.getElementOrNull(array, i);
+}
+Comparator ascendingComparator = new 
ArraySortComparator(ascendingOrder);
+Arrays.sort(
+elements,
+nullFirst
+? Comparator.nullsFirst(ascendingComparator)
+: Comparator.nullsLast(ascendingComparator));
+return new GenericArrayData(elements);
+} catch (Throwable t) {
+throw new FlinkRuntimeException(t);
+}
+}
+
+private class ArraySortComparator implements Comparator {
+private final boolean isAscending;
+
+public ArraySortComparator(boolean isAscending) {
+this.isAscending = isAscending;
+}
+
+@Override
+public int compare(Object o1, Object o2) {
+if (o1 == null || o2 == null) {
+return 0;
+}

Review Comment:
   Is this necessary? This should be handled by the `nullsFirst/nullsLast`, no?



##

Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1489798277


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   The updated function signature is now `ARRAY_SORT(,[, 
[, ]])`, where both sort_ascending and nulls_first 
parameters are optional, allowing for more flexibility in array sorting 
behavior.
   
   The description like this
   
   `The function sorts an array, defaulting to ascending order with NULLs at 
the start when only the array is input. Specifying ascending_order as true 
orders the array in ascending with NULLs first, and setting it to false orders 
it in descending with NULLs last. Independently, null_first as true moves NULLs 
to the beginning, and as false to the end, irrespective of the sorting order. 
The function returns null if any input is null.`
   
   
   @MartijnVisser @dawidwys 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1489798277


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   The updated function signature is now `ARRAY_SORT(,[, 
[, ]])`, where both sort_ascending and nulls_first 
parameters are optional, allowing for more flexibility in array sorting 
behavior.
   
   The description like this
   
   `Returns the array in sorted order. When only the input array is provided, 
ascending_order and null_first default to true, sorting the array in ascending 
order with NULLs at the start. Setting ascending_order to true sorts array in 
ascending order, placing NULLs first. Setting it to false sorts array in 
descending order, with NULLs last. null_first true places NULLs at the 
beginning, and false at the end, regardless of sort order. If any input is 
null, the function returns null.`
   
   
   @MartijnVisser @dawidwys 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1489798277


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   The updated function signature is now `ARRAY_SORT(,[, 
[, ]])`, where both sort_ascending and nulls_first 
parameters are optional, allowing for more flexibility in array sorting 
behavior.
   
   The description like this
   
   `Returns the array in sorted order. When only the input array is provided, 
ascending_order and null_first default to true, sorting the array in ascending 
order with NULLs at the start. True for ascending_order results in ascending 
sort with NULLs first, while false leads to descending sort with NULLs last. 
Regardless of ascending_order, null_first true places NULLs at the beginning, 
and false at the end. If any input is null, the function returns null.`
   
   
   @MartijnVisser @dawidwys 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1489798277


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   The updated function signature is now `ARRAY_SORT(,[, 
[, ]])`, where both sort_ascending and nulls_first 
parameters are optional, allowing for more flexibility in array sorting 
behavior.
   
   The description like this
   
   `The function sorts an array with ascending_order and null_first as optional 
inputs. By default, with only the input array, both are true, resulting in 
ascending order sorting with NULLs at the beginning. Specifying ascending_order 
without null_first results in sorting where true for ascending_order places 
NULLs first, and false places NULLs last. With all three inputs, null_first 
true places NULLs at the start, and false at the end, regardless of 
ascending_order's value. If any input is null, the function returns null.`
   
   
   @MartijnVisser @dawidwys 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1489798277


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   The updated function signature is now `ARRAY_SORT(,[, 
[, ]])`, where both sort_ascending and nulls_first 
parameters are optional, allowing for more flexibility in array sorting 
behavior.
   @MartijnVisser @dawidwys 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1489798277


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   The updated function signature is now ARRAY_SORT(, [, 
[, ]]), where both sort_ascending and nulls_first 
parameters are optional, allowing for more flexibility in array sorting 
behavior.
   @MartijnVisser @dawidwys 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1489798277


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   The updated function signature is now ARRAY_SORT([, 
[, ]]), where both sort_ascending and nulls_first 
parameters are optional, allowing for more flexibility in array sorting 
behavior.
   @MartijnVisser @dawidwys 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


MartijnVisser commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1489314997


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   This is quite a rabbit hole  
   
   1. Databricks has two functions:
   
   * sort_array 
https://docs.databricks.com/en/sql/language-manual/functions/sort_array.html - 
Sorts ascending, puts NULL at beginning.
   *  array_sort 
https://docs.databricks.com/en/sql/language-manual/functions/array_sort.html - 
Sorts ascending, puts NULL at the end. 
   
   2. Spark has 
https://spark.apache.org/docs/latest/api/sql/index.html#sort_array which is 
indeed not different from the Databricks one. Sorts ascending, puts NULL at 
beginning
   
   3. Snowflake indeed has 
https://docs.snowflake.com/en/sql-reference/functions/array_sort - Sorts 
ascending, puts NULL at ~~the end~~ beginning.
   
   4. Trino, that has `array_sort(x)` - Sorts ascending, puts NULL at the end. 
   
   5. StarRocks also has this function 
https://docs.starrocks.io/docs/2.2/sql-reference/sql-functions/array-functions/array_sort/
 - Sorts ascending, puts NULL at beginning
   
   6. Doris has it 
https://doris.apache.org/docs/1.2/sql-manual/sql-functions/array-functions/array_sort/
 - Sorts ascending, puts NULL at beginning
   
   7. Hive uses NULLS FIRST as the default behavior in ascending ordering mode, 
per https://issues.apache.org/jira/browse/HIVE-12994 - Sorts ascending, puts 
NULL at beginning.
   
   I do think that the Snowflake has the cleanest function signature: there's 
just one function, and it has two optional arguments to give all the 
flexibility. 
   
   I think the default sorting mode should be ascending, and then it boils down 
to decide on the default location on where to put NULL. Out of the 7 
implementations, we have 6 who put NULL at the beginning, and only Databricks 
puts them at the end. Naturally, I think we should put NULL at the beginning 
but I'm +0 on the default location of NULL. 
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


MartijnVisser commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1489343962


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   Sounds good to me, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


dawidwys commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1489331336


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   @MartijnVisser Actually Snowflake does:
   > nulls_first
Default: FALSE if the ARRAY is sorted in ascending order; TRUE if the ARRAY 
is sorted in descending order.
   
   I'd suggest we:

   1. adapt Snowflake's signature where we get `ARRAY, BOOLEAN, 
BOOLEAN`, output is `ARRAY` which is `NULLABLE` if any of the input 
is nullable
   2. We use defaults: ascending and `nulls_first` for ASC, `nulls_last` for 
DESC
   3. If any of the input is null the output is also null
   
   How does this sound?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


MartijnVisser commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1489314997


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   This is quite a rabbit hole  
   
   1. Databricks has two functions:
   
   * sort_array 
https://docs.databricks.com/en/sql/language-manual/functions/sort_array.html - 
Sorts ascending, puts NULL at beginning.
   *  array_sort 
https://docs.databricks.com/en/sql/language-manual/functions/array_sort.html - 
Sorts ascending, puts NULL at the end. 
   
   2. Spark has 
https://spark.apache.org/docs/latest/api/sql/index.html#sort_array which is 
indeed not different from the Databricks one. Sorts ascending, puts NULL at 
beginning
   
   3. Snowflake indeed has 
https://docs.snowflake.com/en/sql-reference/functions/array_sort - Sorts 
ascending, puts NULL at the end. 
   
   4. Trino, that has `array_sort(x)` - Sorts ascending, puts NULL at the end. 
   
   5. StarRocks also has this function 
https://docs.starrocks.io/docs/2.2/sql-reference/sql-functions/array-functions/array_sort/
 - Sorts ascending, puts NULL at beginning
   
   6. Doris has it 
https://doris.apache.org/docs/1.2/sql-manual/sql-functions/array-functions/array_sort/
 - Sorts ascending, puts NULL at beginning
   
   7. Hive uses NULLS FIRST as the default behavior in ascending ordering mode, 
per https://issues.apache.org/jira/browse/HIVE-12994 - Sorts ascending, puts 
NULL at beginning.
   
   I do think that the Snowflake has the cleanest function signature: there's 
just one function, and it has two optional arguments to give all the 
flexibility. 
   
   I think the default sorting mode should be ascending, and then it boils down 
to decide on the default location on where to put NULL. Out of the 7 
implementations, we have 5 who put NULL at the beginning, and 2 who put them at 
the end. Naturally, I think we should put NULL at the beginning but I'm +0 on 
the default location of NULL. 
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-14 Thread via GitHub


dawidwys commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1487819172


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   > Ok, thank you, actually, I did this function last summer and follow the 
jira tickect's https://issues.apache.org/jira/browse/FLINK-26948 description.
   
   I understand that. If at some point we find out we either made mistake or 
did not put enough effort into something it's better to fix that sooner rather 
than later when we need to live with the consequences. I admit I have not 
thoroughly checked the semantics before which I should've.
   
   It's better to do something well rather than fast in my opinion.
   
   I see, so traditional RDBMS do not really support that function. It's also 
worth checking what does:
   * Snowflake: 
https://docs.snowflake.com/en/sql-reference/functions/array_sort (`null` when 
any argument is `null`), null handling separately
   * Spark: 
https://docs.databricks.com/en/sql/language-manual/functions/sort_array.html: 
from the docs it does not say what's the behaviour on `null` `ascendingOrder`, 
nulls first on asc, nulls last on desc
   * Presto: https://prestodb.io/docs/current/functions/array.html: has two 
separate functions for `ASC/DESC`
   
   To me Snowflake's behaviour is the cleanest out there. WDYT? @MartijnVisser 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-13 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1488414914


##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java:
##
@@ -640,6 +640,28 @@ ANY, explicit(DataTypes.INT())
 .expectArgumentTypes(
 
DataTypes.ARRAY(DataTypes.INT().notNull()).notNull(),
 DataTypes.INT()),
+
TestSpec.forStrategy(sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.expectSignature("f(>)")
+
.calledWithArgumentTypes(DataTypes.ARRAY(DataTypes.ROW()))
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),
+TestSpec.forStrategy(
+"Strategy fails if input argument type is not 
ARRAY",
+
sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.calledWithArgumentTypes(DataTypes.INT())
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),
+TestSpec.forStrategy(
+"Strategy fails if the number of input 
arguments are not one",
+
sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.calledWithArgumentTypes(
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.STRING()))
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),

Review Comment:
   Ok, I will remove this test case. Thank you for your explain.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-13 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1488411926


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   Ok, waiting for @MartijnVisser reply.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-13 Thread via GitHub


dawidwys commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1487826357


##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java:
##
@@ -640,6 +640,28 @@ ANY, explicit(DataTypes.INT())
 .expectArgumentTypes(
 
DataTypes.ARRAY(DataTypes.INT().notNull()).notNull(),
 DataTypes.INT()),
+
TestSpec.forStrategy(sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.expectSignature("f(>)")
+
.calledWithArgumentTypes(DataTypes.ARRAY(DataTypes.ROW()))
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),
+TestSpec.forStrategy(
+"Strategy fails if input argument type is not 
ARRAY",
+
sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.calledWithArgumentTypes(DataTypes.INT())
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),
+TestSpec.forStrategy(
+"Strategy fails if the number of input 
arguments are not one",
+
sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.calledWithArgumentTypes(
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.STRING()))
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),

Review Comment:
   Sorry, I was not clear enough. I meant we don't need to test 
`sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE)` takes a single 
argument. We don't need to test that, because that's a property of the 
`sequence`, whatever we use here instead of 
`SpecificInputTypeStrategies.ARRAY_COMPARABLE` does not really matter.
   
   The other two cases make total sense and you do it the right way!  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-13 Thread via GitHub


dawidwys commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1487823032


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +231,22 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(ARRAY_COMPARABLE),
+sequence(
+ARRAY_COMPARABLE,
+InputTypeStrategies.explicit(

Review Comment:
   You're right. Sorry, missed that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-13 Thread via GitHub


dawidwys commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1487819172


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   > Ok, thank you, actually, I did this function last summer and follow the 
jira tickect's https://issues.apache.org/jira/browse/FLINK-26948 description.
   
   I understand that. If at some point we find out we either made mistake or 
did not put enough effort into something it's better to fix that sooner rather 
than later when we need to leave with the consequences. I admit I have not 
thoroughly checked the semantics before which I should've.
   
   It's better to do something well rather than fast in my opinion.
   
   I see, so traditional RDBMS do not really support that function. It's also 
worth checking what does:
   * Snowflake: 
https://docs.snowflake.com/en/sql-reference/functions/array_sort (`null` when 
any argument is `null`), null handling separately
   * Spark: 
https://docs.databricks.com/en/sql/language-manual/functions/sort_array.html: 
from the docs it does not say what's the behaviour on `null` `ascendingOrder`, 
nulls first on asc, nulls last on desc
   * Presto: https://prestodb.io/docs/current/functions/array.html: has two 
separate functions for `ASC/DESC`
   
   To me Snowflake's behaviour is the cleanest out there. WDYT? @MartijnVisser 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-13 Thread via GitHub


dawidwys commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1487819172


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   > Ok, thank you, actually, I did this function last summer and follow the 
jira tickect's https://issues.apache.org/jira/browse/FLINK-26948 description.
   
   I understand that. If at some point we find out we either made mistake or 
did not put enough effort into something it's better to fix that sooner rather 
than later when we need to leave with the consequences. I admit I have not 
thoroughly checked the semantics before which I should've.
   
   It's better to do something well rather than fast in my opinion.
   
   I see, so traditional RDBMS do not really support that function. It's also 
worth checking what does:
   * Snowflake: 
https://docs.snowflake.com/en/sql-reference/functions/array_sort (`null` on any 
argument `null`), null handling separately
   * Spark: 
https://docs.databricks.com/en/sql/language-manual/functions/sort_array.html: 
from the docs it does not say what's the behaviour on `null` `ascendingOrder`
   * Presto: https://prestodb.io/docs/current/functions/array.html: has two 
separate functions for `ASC/DESC`
   
   To me Snowflake's behaviour is the cleanest out there. WDYT? @MartijnVisser 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-13 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1486986597


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   SQL Server: Uses the ORDER BY clause for sorting based on column names, 
supporting ascending or descending order. Does not have native array types but 
allows sorting through table variables or temporary tables.
   
   SQLite: Similar to SQL Server, it relies on ORDER BY for sorting without 
built-in array support.
   
   PostgreSQL: Supports arrays and provides functions like array_sort for 
direct sorting of array elements.
   For postgresql https://www.postgresql.org/docs/8.4/intarray.html
   it has four ways to sort an array
   ```
   
   sort(int[], text dir) | int[] | sort array — dir must be asc or desc | 
sort('{1,2,3}'::int[], 'desc') | {3,2,1}
   sort(int[]) | int[] | sort in ascending order | sort(array[11,77,44]) | 
{11,44,77}
   sort_asc(int[]) | int[] | sort in ascending order |   |  
   sort_desc(int[]) | int[] | sort in descending order |  
   ```
   MySQL & MariaDB: Lack built-in array support; sorting is achieved using 
ORDER BY. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-13 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1486986597


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   SQL Server: Uses the ORDER BY clause for sorting based on column names, 
supporting ascending or descending order. Does not have native array types but 
allows sorting through table variables or temporary tables.
   
   SQLite: Similar to SQL Server, it relies on ORDER BY for sorting without 
built-in array support.
   
   PostgreSQL: Supports arrays and provides functions like array_sort for 
direct sorting of array elements.
   For postgresql https://www.postgresql.org/docs/8.4/intarray.html
   it has four ways to sort an array
   ```
   
   sort(int[], text dir) | int[] | sort array — dir must be asc or desc | 
sort('{1,2,3}'::int[], 'desc') | {3,2,1}
   sort(int[]) | int[] | sort in ascending order | sort(array[11,77,44]) | 
{11,44,77}
   sort_asc(int[]) | int[] | sort in ascending order |   |  
   sort_desc(int[]) | int[] | sort in descending order |  
   ```
   MySQL & MariaDB: Lack built-in array support; sorting is achieved using 
ORDER BY. For JSON array elements, functions such as JSON_EXTRACT are used.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-13 Thread via GitHub


hanyuzheng7 commented on PR #22951:
URL: https://github.com/apache/flink/pull/22951#issuecomment-1940657344

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-12 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1486986597


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   
   SQL Server: Uses the ORDER BY clause for sorting based on column names, 
supporting ascending or descending order. Does not have native array types but 
allows sorting through table variables or temporary tables. More info.
   
   SQLite: Similar to SQL Server, it relies on ORDER BY for sorting without 
built-in array support.
   
   PostgreSQL: Supports arrays and provides functions like array_sort for 
direct sorting of array elements.
   For postgresql https://www.postgresql.org/docs/8.4/intarray.html
   it has four ways to sort an array
   ```
   
   sort(int[], text dir) | int[] | sort array — dir must be asc or desc | 
sort('{1,2,3}'::int[], 'desc') | {3,2,1}
   sort(int[]) | int[] | sort in ascending order | sort(array[11,77,44]) | 
{11,44,77}
   sort_asc(int[]) | int[] | sort in ascending order |   |  
   sort_desc(int[]) | int[] | sort in descending order |  
   ```
   MySQL & MariaDB: Lack built-in array support; sorting is achieved using 
ORDER BY. For JSON array elements, functions such as JSON_EXTRACT are used.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-12 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1486986597


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   For postgresql https://www.postgresql.org/docs/8.4/intarray.html
   it has four ways to sort an array
   ```
   sort(int[], text dir) | int[] | sort array — dir must be asc or desc | 
sort('{1,2,3}'::int[], 'desc') | {3,2,1}
   sort(int[]) | int[] | sort in ascending order | sort(array[11,77,44]) | 
{11,44,77}
   sort_asc(int[]) | int[] | sort in ascending order |   |  
   sort_desc(int[]) | int[] | sort in descending order |  
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-12 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1486986597


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   For postgresql https://www.postgresql.org/docs/8.4/intarray.html
   it has three ways to sort an array
   ```
   sort(int[], text dir) | int[] | sort array — dir must be asc or desc | 
sort('{1,2,3}'::int[], 'desc') | {3,2,1}
   sort(int[]) | int[] | sort in ascending order | sort(array[11,77,44]) | 
{11,44,77}
   sort_asc(int[]) | int[] | sort in ascending order |   |  
   sort_desc(int[]) | int[] | sort in descending order |  
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-12 Thread via GitHub


hanyuzheng7 commented on PR #22951:
URL: https://github.com/apache/flink/pull/22951#issuecomment-1939820210

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-12 Thread via GitHub


hanyuzheng7 commented on PR #22951:
URL: https://github.com/apache/flink/pull/22951#issuecomment-1939582774

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-12 Thread via GitHub


hanyuzheng7 commented on PR #22951:
URL: https://github.com/apache/flink/pull/22951#issuecomment-1939194309

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-12 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1486481830


##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java:
##
@@ -640,6 +640,28 @@ ANY, explicit(DataTypes.INT())
 .expectArgumentTypes(
 
DataTypes.ARRAY(DataTypes.INT().notNull()).notNull(),
 DataTypes.INT()),
+
TestSpec.forStrategy(sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.expectSignature("f(>)")
+
.calledWithArgumentTypes(DataTypes.ARRAY(DataTypes.ROW()))
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),
+TestSpec.forStrategy(
+"Strategy fails if input argument type is not 
ARRAY",
+
sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.calledWithArgumentTypes(DataTypes.INT())
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),
+TestSpec.forStrategy(
+"Strategy fails if the number of input 
arguments are not one",
+
sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.calledWithArgumentTypes(
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.STRING()))
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),

Review Comment:
   Yes, although I use `sequence`, actually I test `ARRAY_COMPARABLE` strategy 
here. If I don't use `sequence`, I cannot test ArgumentTypeStrategy here. I saw 
other contributor using `sequence` testing their `ArgumentTypeStrategy` in this 
class.
   
   Because https://github.com/apache/flink/pull/22951#discussion_r1484467843, I 
delete old test 
file([flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategyTest.java](url))
 and add these test in the `InputTypeStrategiesTest`.



##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java:
##
@@ -640,6 +640,28 @@ ANY, explicit(DataTypes.INT())
 .expectArgumentTypes(
 
DataTypes.ARRAY(DataTypes.INT().notNull()).notNull(),
 DataTypes.INT()),
+
TestSpec.forStrategy(sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.expectSignature("f(>)")
+
.calledWithArgumentTypes(DataTypes.ARRAY(DataTypes.ROW()))
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),
+TestSpec.forStrategy(
+"Strategy fails if input argument type is not 
ARRAY",
+
sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.calledWithArgumentTypes(DataTypes.INT())
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),
+TestSpec.forStrategy(
+"Strategy fails if the number of input 
arguments are not one",
+
sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.calledWithArgumentTypes(
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.STRING()))
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),

Review Comment:
   Yes, although I use `sequence`, actually I test `ARRAY_COMPARABLE` strategy 
here. If I don't use `sequence`, I cannot test ArgumentTypeStrategy here. I saw 
other contributor using `sequence` testing their `ArgumentTypeStrategy` in this 
class.
   
   Because https://github.com/apache/flink/pull/22951#discussion_r1484467843, I 
deleted old test 
file([flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategyTest.java](url))
 and add these test in the `InputTypeStrategiesTest`.



-- 

Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-12 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1486487968


##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java:
##
@@ -640,6 +640,28 @@ ANY, explicit(DataTypes.INT())
 .expectArgumentTypes(
 
DataTypes.ARRAY(DataTypes.INT().notNull()).notNull(),
 DataTypes.INT()),
+
TestSpec.forStrategy(sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.expectSignature("f(>)")
+
.calledWithArgumentTypes(DataTypes.ARRAY(DataTypes.ROW()))
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),
+TestSpec.forStrategy(
+"Strategy fails if input argument type is not 
ARRAY",
+
sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.calledWithArgumentTypes(DataTypes.INT())
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),
+TestSpec.forStrategy(
+"Strategy fails if the number of input 
arguments are not one",
+
sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.calledWithArgumentTypes(
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.STRING()))
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),

Review Comment:
   Because https://github.com/apache/flink/pull/22951#discussion_r1484467843, I 
delete old test file and add these test in the `InputTypeStrategiesTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-12 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1486487968


##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java:
##
@@ -640,6 +640,28 @@ ANY, explicit(DataTypes.INT())
 .expectArgumentTypes(
 
DataTypes.ARRAY(DataTypes.INT().notNull()).notNull(),
 DataTypes.INT()),
+
TestSpec.forStrategy(sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.expectSignature("f(>)")
+
.calledWithArgumentTypes(DataTypes.ARRAY(DataTypes.ROW()))
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),
+TestSpec.forStrategy(
+"Strategy fails if input argument type is not 
ARRAY",
+
sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.calledWithArgumentTypes(DataTypes.INT())
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),
+TestSpec.forStrategy(
+"Strategy fails if the number of input 
arguments are not one",
+
sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.calledWithArgumentTypes(
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.STRING()))
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),

Review Comment:
   Because https://github.com/apache/flink/pull/22951#discussion_r1484467843, I 
delete old test file and add these test in the `InputTypeStrategiesTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-12 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1486481830


##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java:
##
@@ -640,6 +640,28 @@ ANY, explicit(DataTypes.INT())
 .expectArgumentTypes(
 
DataTypes.ARRAY(DataTypes.INT().notNull()).notNull(),
 DataTypes.INT()),
+
TestSpec.forStrategy(sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.expectSignature("f(>)")
+
.calledWithArgumentTypes(DataTypes.ARRAY(DataTypes.ROW()))
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),
+TestSpec.forStrategy(
+"Strategy fails if input argument type is not 
ARRAY",
+
sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.calledWithArgumentTypes(DataTypes.INT())
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),
+TestSpec.forStrategy(
+"Strategy fails if the number of input 
arguments are not one",
+
sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.calledWithArgumentTypes(
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.STRING()))
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),

Review Comment:
   Yes, although I use `sequence`, actually I test `ARRAY_COMPARABLE` strategy 
here. If I don't use `sequence`, I cannot test ArgumentTypeStrategy here. I saw 
other contributor using `sequence` testing their `ArgumentTypeStrategy` in this 
class.
   
   The test is same as
   
[flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategyTest.java](url)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-12 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1486481830


##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java:
##
@@ -640,6 +640,28 @@ ANY, explicit(DataTypes.INT())
 .expectArgumentTypes(
 
DataTypes.ARRAY(DataTypes.INT().notNull()).notNull(),
 DataTypes.INT()),
+
TestSpec.forStrategy(sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.expectSignature("f(>)")
+
.calledWithArgumentTypes(DataTypes.ARRAY(DataTypes.ROW()))
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),
+TestSpec.forStrategy(
+"Strategy fails if input argument type is not 
ARRAY",
+
sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.calledWithArgumentTypes(DataTypes.INT())
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),
+TestSpec.forStrategy(
+"Strategy fails if the number of input 
arguments are not one",
+
sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.calledWithArgumentTypes(
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.STRING()))
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),

Review Comment:
   Yes, although I use `sequence`, actually I test `ARRAY_COMPARABLE` strategy 
here. If I don't use `sequence`, I cannot test ArgumentTypeStrategy here. I saw 
other contributor using `sequence` testing their `ArgumentTypeStrategy` in this 
class.
   
   The test is same as
   
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategyTest.java



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-12 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1486481830


##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java:
##
@@ -640,6 +640,28 @@ ANY, explicit(DataTypes.INT())
 .expectArgumentTypes(
 
DataTypes.ARRAY(DataTypes.INT().notNull()).notNull(),
 DataTypes.INT()),
+
TestSpec.forStrategy(sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.expectSignature("f(>)")
+
.calledWithArgumentTypes(DataTypes.ARRAY(DataTypes.ROW()))
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),
+TestSpec.forStrategy(
+"Strategy fails if input argument type is not 
ARRAY",
+
sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.calledWithArgumentTypes(DataTypes.INT())
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),
+TestSpec.forStrategy(
+"Strategy fails if the number of input 
arguments are not one",
+
sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.calledWithArgumentTypes(
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.STRING()))
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),

Review Comment:
   Yes, although I use `sequence`, actually I test `ARRAY_COMPARABLE` strategy 
here. If I don't use `sequence`, I cannot test ArgumentTypeStrategy here. I saw 
other contributor using `sequence` testing their `ArgumentTypeStrategy` in this 
class.
   
   The test is similar as 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategyTest.java



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-12 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1486472909


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +231,22 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(ARRAY_COMPARABLE),
+sequence(
+ARRAY_COMPARABLE,
+InputTypeStrategies.explicit(

Review Comment:
   If we not use  InputTypeStrategies.explicit here, it will conflict with 
   `import static 
org.apache.flink.table.types.inference.TypeStrategies.explicit;`



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java:
##
@@ -89,6 +90,10 @@ public static InputTypeStrategy windowTimeIndicator() {
 public static final ArgumentTypeStrategy ARRAY_ELEMENT_ARG =
 new ArrayElementArgumentTypeStrategy();
 
+/** Argument type representing the array is comparable. */
+public static final ArgumentTypeStrategy ARRAY_COMPARABLE =

Review Comment:
   ok



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementArgumentTypeStrategy.java:
##
@@ -34,40 +32,33 @@
 import 
org.apache.flink.table.types.logical.StructuredType.StructuredComparison;
 import org.apache.flink.util.Preconditions;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
 /**
- * An {@link InputTypeStrategy} that checks if the input argument is an ARRAY 
type and check whether
- * its' elements are comparable.
+ * An {@link ArgumentTypeStrategy} that checks if the input argument is an 
ARRAY type and check
+ * whether its' elements are comparable.
  *
  * It requires one argument.
  *
  * For the rules which types are comparable with which types see {@link
  * #areComparable(LogicalType, LogicalType)}.
  */
 @Internal
-public final class ArrayComparableElementTypeStrategy implements 
InputTypeStrategy {
+public final class ArrayComparableElementArgumentTypeStrategy implements 
ArgumentTypeStrategy {
+
 private final StructuredComparison requiredComparison;
-private final ConstantArgumentCount argumentCount;
 
-public ArrayComparableElementTypeStrategy(StructuredComparison 
requiredComparison) {
+public ArrayComparableElementArgumentTypeStrategy(StructuredComparison 
requiredComparison) {
 Preconditions.checkArgument(requiredComparison != 
StructuredComparison.NONE);
 this.requiredComparison = requiredComparison;
-this.argumentCount = ConstantArgumentCount.of(1);
-}
-
-@Override
-public ArgumentCount getArgumentCount() {
-return argumentCount;
 }
 
 @Override
-public Optional> inferInputTypes(
-CallContext callContext, boolean throwOnFailure) {
+public Optional inferArgumentType(
+CallContext callContext, int argumentPos, boolean throwOnFailure) {
 final List argumentDataTypes = 
callContext.getArgumentDataTypes();
-final DataType argumentType = argumentDataTypes.get(0);
+final DataType argumentType = argumentDataTypes.get(argumentPos);
 if (!argumentType.getLogicalType().is(LogicalTypeRoot.ARRAY)) {
 return callContext.fail(throwOnFailure, "All arguments requires to 
be an ARRAY type");

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-12 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1486465811


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySortFunction.java:
##
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of ARRAY_SORT function. */
+@Internal
+public class ArraySortFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter elementGetter;
+private final SpecializedFunction.ExpressionEvaluator greaterEvaluator;
+
+private transient MethodHandle greaterHandle;
+
+public ArraySortFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.ARRAY_SORT, context);
+final DataType elementDataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType()
+.toInternal();
+elementGetter =
+
ArrayData.createElementGetter(elementDataType.toInternal().getLogicalType());
+greaterEvaluator =
+context.createEvaluator(
+$("element1").isGreater($("element2")),
+DataTypes.BOOLEAN(),
+DataTypes.FIELD("element1", 
elementDataType.notNull().toInternal()),
+DataTypes.FIELD("element2", 
elementDataType.notNull().toInternal()));
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+greaterHandle = greaterEvaluator.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData array, Boolean... 
ascendingOrder) {
+try {
+if (array == null) {
+return null;
+}
+if (array.size() == 0) {
+return array;
+}
+boolean isAscending = ascendingOrder.length > 0 ? 
ascendingOrder[0] : true;
+Object[] elements = new Object[array.size()];
+for (int i = 0; i < array.size(); i++) {
+elements[i] = elementGetter.getElementOrNull(array, i);
+}
+Comparator ascendingComparator = new 
ArraySortComparator(isAscending);
+Arrays.sort(elements, ascendingComparator);
+return new GenericArrayData(elements);
+} catch (Throwable t) {
+throw new FlinkRuntimeException(t);
+}
+}
+
+private class ArraySortComparator implements Comparator {
+private final boolean isAscending;
+
+public ArraySortComparator(boolean isAscending) {
+this.isAscending = isAscending;
+}
+
+@Override
+public int compare(Object o1, Object o2) {
+Comparator baseComparator =
+(a, b) -> {
+try {
+if (a == null || b == null) {
+return 0;
+}
+boolean isGreater = (boolean) 
greaterHandle.invoke(a, b);
+return isGreater ? 1 : -1;
+} catch (Throwable e) {
+throw new RuntimeException(e);
+}
+};
+Comparator comparator =
+isAscending
+

Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-12 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1486372018


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   Ok, thank you, actually, I did this function last summer and follow the jira 
tickect's https://issues.apache.org/jira/browse/FLINK-26948 description.
   And I will check other RDNMS today.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-12 Thread via GitHub


dawidwys commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1486362717


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   If you're looking for ways to verify other databases that's a good start: 
https://sqlfiddle.com/



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-12 Thread via GitHub


dawidwys commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1485933655


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementArgumentTypeStrategy.java:
##
@@ -34,40 +32,33 @@
 import 
org.apache.flink.table.types.logical.StructuredType.StructuredComparison;
 import org.apache.flink.util.Preconditions;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
 /**
- * An {@link InputTypeStrategy} that checks if the input argument is an ARRAY 
type and check whether
- * its' elements are comparable.
+ * An {@link ArgumentTypeStrategy} that checks if the input argument is an 
ARRAY type and check
+ * whether its' elements are comparable.
  *
  * It requires one argument.
  *
  * For the rules which types are comparable with which types see {@link
  * #areComparable(LogicalType, LogicalType)}.
  */
 @Internal
-public final class ArrayComparableElementTypeStrategy implements 
InputTypeStrategy {
+public final class ArrayComparableElementArgumentTypeStrategy implements 
ArgumentTypeStrategy {
+
 private final StructuredComparison requiredComparison;
-private final ConstantArgumentCount argumentCount;
 
-public ArrayComparableElementTypeStrategy(StructuredComparison 
requiredComparison) {
+public ArrayComparableElementArgumentTypeStrategy(StructuredComparison 
requiredComparison) {
 Preconditions.checkArgument(requiredComparison != 
StructuredComparison.NONE);
 this.requiredComparison = requiredComparison;
-this.argumentCount = ConstantArgumentCount.of(1);
-}
-
-@Override
-public ArgumentCount getArgumentCount() {
-return argumentCount;
 }
 
 @Override
-public Optional> inferInputTypes(
-CallContext callContext, boolean throwOnFailure) {
+public Optional inferArgumentType(
+CallContext callContext, int argumentPos, boolean throwOnFailure) {
 final List argumentDataTypes = 
callContext.getArgumentDataTypes();
-final DataType argumentType = argumentDataTypes.get(0);
+final DataType argumentType = argumentDataTypes.get(argumentPos);
 if (!argumentType.getLogicalType().is(LogicalTypeRoot.ARRAY)) {
 return callContext.fail(throwOnFailure, "All arguments requires to 
be an ARRAY type");

Review Comment:
   please update the comment
   
   It's an argument type strategy now, so it applies to a single argument not 
`ALL`



##
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java:
##
@@ -640,6 +640,28 @@ ANY, explicit(DataTypes.INT())
 .expectArgumentTypes(
 
DataTypes.ARRAY(DataTypes.INT().notNull()).notNull(),
 DataTypes.INT()),
+
TestSpec.forStrategy(sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.expectSignature("f(>)")
+
.calledWithArgumentTypes(DataTypes.ARRAY(DataTypes.ROW()))
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),
+TestSpec.forStrategy(
+"Strategy fails if input argument type is not 
ARRAY",
+
sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.calledWithArgumentTypes(DataTypes.INT())
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),
+TestSpec.forStrategy(
+"Strategy fails if the number of input 
arguments are not one",
+
sequence(SpecificInputTypeStrategies.ARRAY_COMPARABLE))
+.calledWithArgumentTypes(
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.STRING()))
+.expectErrorMessage(
+"Invalid input arguments. Expected signatures 
are:\n"
++ "f(>)"),

Review Comment:
   You test the `sequence` strategy here. We have tests for that already.



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +231,22 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+

Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-11 Thread via GitHub


hanyuzheng7 commented on PR #22951:
URL: https://github.com/apache/flink/pull/22951#issuecomment-1938091082

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-10 Thread via GitHub


hanyuzheng7 commented on PR #22951:
URL: https://github.com/apache/flink/pull/22951#issuecomment-1936932786

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-09 Thread via GitHub


hanyuzheng7 commented on PR #22951:
URL: https://github.com/apache/flink/pull/22951#issuecomment-1936835154

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-09 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1484533063


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySortFunction.java:
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of ARRAY_SORT function. */
+@Internal
+public class ArraySortFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter elementGetter;
+private final SpecializedFunction.ExpressionEvaluator greaterEvaluator;
+
+private transient MethodHandle greaterHandle;
+
+public ArraySortFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.ARRAY_SORT, context);
+final DataType elementDataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType();
+elementGetter = 
ArrayData.createElementGetter(elementDataType.getLogicalType());
+greaterEvaluator =
+context.createEvaluator(
+$("element1").isGreater($("element2")),
+DataTypes.BOOLEAN(),
+DataTypes.FIELD("element1", 
elementDataType.notNull().toInternal()),
+DataTypes.FIELD("element2", 
elementDataType.notNull().toInternal()));
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+greaterHandle = greaterEvaluator.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData array, Boolean... 
ascendingOrder) {
+try {
+if (array == null || ascendingOrder.length > 0 && 
ascendingOrder[0] == null) {
+return null;
+}
+if (array.size() == 0) {
+return array;
+}
+boolean isAscending = ascendingOrder.length > 0 ? 
ascendingOrder[0] : true;
+Object[] elements = new Object[array.size()];
+for (int i = 0; i < array.size(); i++) {
+elements[i] = elementGetter.getElementOrNull(array, i);
+}
+if (isAscending) {
+Comparator ascendingComparator = new 
MyComparator(true);
+Arrays.sort(elements, ascendingComparator);
+} else {
+Comparator ascendingComparator = new 
MyComparator(false);
+Arrays.sort(elements, ascendingComparator);
+}
+return new GenericArrayData(elements);
+} catch (Throwable t) {
+throw new FlinkRuntimeException(t);
+}
+}
+
+private class MyComparator implements Comparator {
+private final boolean isAscending;
+
+public MyComparator(boolean isAscending) {
+this.isAscending = isAscending;
+}
+
+@Override
+public int compare(Object o1, Object o2) {
+try {
+if (isAscending) {
+if (o1 == null) {
+return -1;
+}
+if (o2 == null) {
+return -1;
+}
+boolean found = (boolean) greaterHandle.invoke(o1, o2);

Review Comment:
   Ok, I will update it.



-- 
This is an automated message from the Apache Git Service.
To respond to 

Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-09 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1484532480


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySortFunction.java:
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of ARRAY_SORT function. */
+@Internal
+public class ArraySortFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter elementGetter;
+private final SpecializedFunction.ExpressionEvaluator greaterEvaluator;
+
+private transient MethodHandle greaterHandle;
+
+public ArraySortFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.ARRAY_SORT, context);
+final DataType elementDataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType();
+elementGetter = 
ArrayData.createElementGetter(elementDataType.getLogicalType());
+greaterEvaluator =
+context.createEvaluator(
+$("element1").isGreater($("element2")),
+DataTypes.BOOLEAN(),
+DataTypes.FIELD("element1", 
elementDataType.notNull().toInternal()),
+DataTypes.FIELD("element2", 
elementDataType.notNull().toInternal()));
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+greaterHandle = greaterEvaluator.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData array, Boolean... 
ascendingOrder) {
+try {
+if (array == null || ascendingOrder.length > 0 && 
ascendingOrder[0] == null) {
+return null;
+}
+if (array.size() == 0) {
+return array;
+}
+boolean isAscending = ascendingOrder.length > 0 ? 
ascendingOrder[0] : true;
+Object[] elements = new Object[array.size()];
+for (int i = 0; i < array.size(); i++) {
+elements[i] = elementGetter.getElementOrNull(array, i);
+}
+if (isAscending) {
+Comparator ascendingComparator = new 
MyComparator(true);
+Arrays.sort(elements, ascendingComparator);
+} else {
+Comparator ascendingComparator = new 
MyComparator(false);
+Arrays.sort(elements, ascendingComparator);
+}

Review Comment:
   Ok, thanks for your suggestion.



##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySortFunction.java:
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS 

Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-09 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1484532093


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySortFunction.java:
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of ARRAY_SORT function. */
+@Internal
+public class ArraySortFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter elementGetter;
+private final SpecializedFunction.ExpressionEvaluator greaterEvaluator;
+
+private transient MethodHandle greaterHandle;
+
+public ArraySortFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.ARRAY_SORT, context);
+final DataType elementDataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType();
+elementGetter = 
ArrayData.createElementGetter(elementDataType.getLogicalType());

Review Comment:
   Ok, I will update it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-09 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1484526273


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySortFunction.java:
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of ARRAY_SORT function. */
+@Internal
+public class ArraySortFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter elementGetter;
+private final SpecializedFunction.ExpressionEvaluator greaterEvaluator;
+
+private transient MethodHandle greaterHandle;
+
+public ArraySortFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.ARRAY_SORT, context);
+final DataType elementDataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType();
+elementGetter = 
ArrayData.createElementGetter(elementDataType.getLogicalType());
+greaterEvaluator =
+context.createEvaluator(
+$("element1").isGreater($("element2")),
+DataTypes.BOOLEAN(),
+DataTypes.FIELD("element1", 
elementDataType.notNull().toInternal()),
+DataTypes.FIELD("element2", 
elementDataType.notNull().toInternal()));
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+greaterHandle = greaterEvaluator.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData array, Boolean... 
ascendingOrder) {
+try {
+if (array == null || ascendingOrder.length > 0 && 
ascendingOrder[0] == null) {
+return null;
+}

Review Comment:
   Oh, I see the description in the docs/data/sql_functions.yml is different 
than https://github.com/apache/flink/pull/22951#issue-1788425522
   ```
   Returns:
   if the array is null or ascendingOrder is null return null. if array is 
empty return empty.
   ```
   And  I agree with you, we should not allow second argument NULL, I will 
update it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-09 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1484526273


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySortFunction.java:
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of ARRAY_SORT function. */
+@Internal
+public class ArraySortFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter elementGetter;
+private final SpecializedFunction.ExpressionEvaluator greaterEvaluator;
+
+private transient MethodHandle greaterHandle;
+
+public ArraySortFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.ARRAY_SORT, context);
+final DataType elementDataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType();
+elementGetter = 
ArrayData.createElementGetter(elementDataType.getLogicalType());
+greaterEvaluator =
+context.createEvaluator(
+$("element1").isGreater($("element2")),
+DataTypes.BOOLEAN(),
+DataTypes.FIELD("element1", 
elementDataType.notNull().toInternal()),
+DataTypes.FIELD("element2", 
elementDataType.notNull().toInternal()));
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+greaterHandle = greaterEvaluator.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData array, Boolean... 
ascendingOrder) {
+try {
+if (array == null || ascendingOrder.length > 0 && 
ascendingOrder[0] == null) {
+return null;
+}

Review Comment:
   Yes, in  the description it said that 
   Returns:
   if the array is null or ascendingOrder is null return null. if array is 
empty return empty.
   And  I agree with you, we should not allow second argument NULL, I will 
update it.



##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySortFunction.java:
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import 

Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-09 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1484524404


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementArgumentTypeStrategy.java:
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.StructuredType;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * An {@link ArgumentTypeStrategy} that checks if the input argument is an 
ARRAY type and check
+ * whether its' elements are comparable.
+ *
+ * It requires one argument.
+ *
+ * For the rules which types are comparable with which types see {@link
+ * #areComparable(LogicalType, LogicalType)}.
+ */
+public final class ArrayComparableElementArgumentTypeStrategy implements 
ArgumentTypeStrategy {

Review Comment:
   Ok, I will do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-09 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1484523821


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   Ok, I will check other RDNMS.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-09 Thread via GitHub


dawidwys commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1484478770


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   You should update the output strategy then to take into account the second 
argument. So far you have only: `nullableIfArgs(argument(0))`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-09 Thread via GitHub


dawidwys commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1484479808


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   Can you check what's the behaviour in other RDBMS? E.g. postgres, oracle, 
sql server?
   
   Personally I find the behaviour a bit weird we accept nullable type for the 
second argument.



##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   Can you check what's the behaviour in other RDBMS? E.g. postgres, oracle, 
sql server?
   
   Personally I find the behaviour a bit weird we accept a nullable type for 
the second argument.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-09 Thread via GitHub


dawidwys commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1484473796


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySortFunction.java:
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of ARRAY_SORT function. */
+@Internal
+public class ArraySortFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter elementGetter;
+private final SpecializedFunction.ExpressionEvaluator greaterEvaluator;
+
+private transient MethodHandle greaterHandle;
+
+public ArraySortFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.ARRAY_SORT, context);
+final DataType elementDataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType();
+elementGetter = 
ArrayData.createElementGetter(elementDataType.getLogicalType());
+greaterEvaluator =
+context.createEvaluator(
+$("element1").isGreater($("element2")),
+DataTypes.BOOLEAN(),
+DataTypes.FIELD("element1", 
elementDataType.notNull().toInternal()),
+DataTypes.FIELD("element2", 
elementDataType.notNull().toInternal()));
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+greaterHandle = greaterEvaluator.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData array, Boolean... 
ascendingOrder) {
+try {
+if (array == null || ascendingOrder.length > 0 && 
ascendingOrder[0] == null) {
+return null;
+}

Review Comment:
   Where? Still can't see it:
   > Returns the array in sorted order. Sorts the input array in ascending or 
descending order according to the natural ordering of the array elements. NULL 
elements are placed at the beginning of the returned array in ascending order 
or at the end of the returned array in descending order. If the array itself is 
null, the function will return null. The optional ascendingOrder argument 
defaults to true if not specified.
   
   It says: output is a sorted array, which might be null if the input array is 
null.
   
   Do I read it wrong? The output type does not depend on the `ascendingOrder` 
from that description.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-09 Thread via GitHub


dawidwys commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1484467843


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementArgumentTypeStrategy.java:
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.StructuredType;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * An {@link ArgumentTypeStrategy} that checks if the input argument is an 
ARRAY type and check
+ * whether its' elements are comparable.
+ *
+ * It requires one argument.
+ *
+ * For the rules which types are comparable with which types see {@link
+ * #areComparable(LogicalType, LogicalType)}.
+ */
+public final class ArrayComparableElementArgumentTypeStrategy implements 
ArgumentTypeStrategy {

Review Comment:
   Would it make sense to convert the old class to `ArgumentTypeStrategy`? 
Building an `InputTypeStrategy` from `ArgumentTypeStrategy` usually should be 
simple: e.g. `sequence(comparable())`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-09 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1484427940


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementArgumentTypeStrategy.java:
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.StructuredType;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * An {@link ArgumentTypeStrategy} that checks if the input argument is an 
ARRAY type and check
+ * whether its' elements are comparable.
+ *
+ * It requires one argument.
+ *
+ * For the rules which types are comparable with which types see {@link
+ * #areComparable(LogicalType, LogicalType)}.
+ */
+public final class ArrayComparableElementArgumentTypeStrategy implements 
ArgumentTypeStrategy {

Review Comment:
   Yes, 
https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java
 
   is also write by myself, but that return type is InputTypeStrategy.
   We need ArgumentTypeStrategy here, so I generate a new class which return 
ArgumentTypeStrategy.
   https://github.com/apache/flink/pull/22951/files#r1484440158
   Maybe we can extract same code as an utils and reuse the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-09 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1484440158


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(

Review Comment:
   I use sequence here, so I need ArgumentTypeStrategy here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-09 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1484427940


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementArgumentTypeStrategy.java:
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.StructuredType;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * An {@link ArgumentTypeStrategy} that checks if the input argument is an 
ARRAY type and check
+ * whether its' elements are comparable.
+ *
+ * It requires one argument.
+ *
+ * For the rules which types are comparable with which types see {@link
+ * #areComparable(LogicalType, LogicalType)}.
+ */
+public final class ArrayComparableElementArgumentTypeStrategy implements 
ArgumentTypeStrategy {

Review Comment:
   Yes, 
https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java
 
   is also write by myself, but that return type is InputTypeStrategy.
   We need ArgumentTypeStrategy here, so I generate a new class which return 
ArgumentTypeStrategy.
   Maybe we can extract same code as an utils and reuse the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-09 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1484430188


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySortFunction.java:
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of ARRAY_SORT function. */
+@Internal
+public class ArraySortFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter elementGetter;
+private final SpecializedFunction.ExpressionEvaluator greaterEvaluator;
+
+private transient MethodHandle greaterHandle;
+
+public ArraySortFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.ARRAY_SORT, context);
+final DataType elementDataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType();
+elementGetter = 
ArrayData.createElementGetter(elementDataType.getLogicalType());
+greaterEvaluator =
+context.createEvaluator(
+$("element1").isGreater($("element2")),
+DataTypes.BOOLEAN(),
+DataTypes.FIELD("element1", 
elementDataType.notNull().toInternal()),
+DataTypes.FIELD("element2", 
elementDataType.notNull().toInternal()));
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+greaterHandle = greaterEvaluator.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData array, Boolean... 
ascendingOrder) {
+try {
+if (array == null || ascendingOrder.length > 0 && 
ascendingOrder[0] == null) {
+return null;
+}

Review Comment:
   From the description, it seems that the output strategy depend on the second 
argument.
   https://github.com/apache/flink/pull/22951#issue-1788425522



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-09 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1484427940


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementArgumentTypeStrategy.java:
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.StructuredType;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * An {@link ArgumentTypeStrategy} that checks if the input argument is an 
ARRAY type and check
+ * whether its' elements are comparable.
+ *
+ * It requires one argument.
+ *
+ * For the rules which types are comparable with which types see {@link
+ * #areComparable(LogicalType, LogicalType)}.
+ */
+public final class ArrayComparableElementArgumentTypeStrategy implements 
ArgumentTypeStrategy {

Review Comment:
   Yes, 
https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java
 
   is also write by myself, but that return type is InputTypeStrategy,
   we need ArgumentTypeStrategy here.
   Maybe we can extract same code as an utils and reuse the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-09 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1484422529


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   Hi, @dawidwys , from the description, if the second argument is null, we 
will return NULL.
   `SELECT ARRAY_SLICE(['a', 'b', 'c', 'd', 'e'], null)
   null
   `
   
   https://github.com/apache/flink/pull/22951#issue-1788425522



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-09 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1484422529


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   Hi, @dawidwys , from the description, if second argument is null, we will 
return NUll.
   `SELECT ARRAY_SLICE(['a', 'b', 'c', 'd', 'e'], null)
   null
   `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-09 Thread via GitHub


hanyuzheng7 commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1484422529


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##
@@ -231,6 +232,21 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
 
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
 .build();
 
+public static final BuiltInFunctionDefinition ARRAY_SORT =
+BuiltInFunctionDefinition.newBuilder()
+.name("ARRAY_SORT")
+.kind(SCALAR)
+.inputTypeStrategy(
+or(
+sequence(new 
ArrayComparableElementArgumentTypeStrategy()),
+sequence(
+new 
ArrayComparableElementArgumentTypeStrategy(),
+logical(LogicalTypeRoot.BOOLEAN

Review Comment:
   Hi, @dawidwys , from the description, if the second argument is null, we 
will return NULL.
   `SELECT ARRAY_SLICE(['a', 'b', 'c', 'd', 'e'], null)
   null
   `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-26948][table] Add-ARRAY_SORT-function. [flink]

2024-02-09 Thread via GitHub


dawidwys commented on code in PR #22951:
URL: https://github.com/apache/flink/pull/22951#discussion_r1484177231


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySortFunction.java:
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of ARRAY_SORT function. */
+@Internal
+public class ArraySortFunction extends BuiltInScalarFunction {
+
+private final ArrayData.ElementGetter elementGetter;
+private final SpecializedFunction.ExpressionEvaluator greaterEvaluator;
+
+private transient MethodHandle greaterHandle;
+
+public ArraySortFunction(SpecializedFunction.SpecializedContext context) {
+super(BuiltInFunctionDefinitions.ARRAY_SORT, context);
+final DataType elementDataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType();
+elementGetter = 
ArrayData.createElementGetter(elementDataType.getLogicalType());
+greaterEvaluator =
+context.createEvaluator(
+$("element1").isGreater($("element2")),
+DataTypes.BOOLEAN(),
+DataTypes.FIELD("element1", 
elementDataType.notNull().toInternal()),
+DataTypes.FIELD("element2", 
elementDataType.notNull().toInternal()));
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+greaterHandle = greaterEvaluator.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData array, Boolean... 
ascendingOrder) {
+try {
+if (array == null || ascendingOrder.length > 0 && 
ascendingOrder[0] == null) {
+return null;
+}

Review Comment:
   This is not covered by the output strategy, is it? The output strategy does 
not depend on the second argument.



##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySortFunction.java:
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import