Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-06-06 Thread via GitHub


snuyanzin merged PR #24526:
URL: https://github.com/apache/flink/pull/24526


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-06-06 Thread via GitHub


snuyanzin commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1630300887


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayIntersectFunction.java:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider;
+import org.apache.flink.table.runtime.util.ObjectContainer;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_INTERSECT}. */
+@Internal
+public class ArrayIntersectFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+private final EqualityAndHashcodeProvider equalityAndHashcodeProvider;
+
+public ArrayIntersectFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.ARRAY_INTERSECT, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType()
+.toInternal();
+elementGetter = 
ArrayData.createElementGetter(dataType.toInternal().getLogicalType());
+this.equalityAndHashcodeProvider = new 
EqualityAndHashcodeProvider(context, dataType);
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+equalityAndHashcodeProvider.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) {
+try {
+if (arrayOne == null || arrayTwo == null) {
+return null;
+}
+
+Set set = new HashSet<>();
+for (int pos = 0; pos < arrayTwo.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayTwo, pos);
+final ObjectContainer objectContainer = 
createObjectContainer(element);
+set.add(objectContainer);
+}
+
+Set res = new LinkedHashSet<>();

Review Comment:
   true



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-06-06 Thread via GitHub


liuyongvs commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1629113958


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayIntersectFunction.java:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider;
+import org.apache.flink.table.runtime.util.ObjectContainer;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_INTERSECT}. */
+@Internal
+public class ArrayIntersectFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+private final EqualityAndHashcodeProvider equalityAndHashcodeProvider;
+
+public ArrayIntersectFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.ARRAY_INTERSECT, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType()
+.toInternal();
+elementGetter = 
ArrayData.createElementGetter(dataType.toInternal().getLogicalType());
+this.equalityAndHashcodeProvider = new 
EqualityAndHashcodeProvider(context, dataType);
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+equalityAndHashcodeProvider.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) {
+try {
+if (arrayOne == null || arrayTwo == null) {
+return null;
+}
+
+Set set = new HashSet<>();
+for (int pos = 0; pos < arrayTwo.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayTwo, pos);
+final ObjectContainer objectContainer = 
createObjectContainer(element);
+set.add(objectContainer);
+}
+
+Set res = new LinkedHashSet<>();
+for (int pos = 0; pos < arrayOne.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayOne, pos);
+final ObjectContainer objectContainer = 
createObjectContainer(element);
+if (set.contains(objectContainer)) {
+res.add(objectContainer);
+}
+}
+
+return new GenericArrayData(
+res.stream()
+.map(element -> element != null ? 
element.getObject() : null)
+.toArray());

Review Comment:
   when element has null, it will NPE @snuyanzin 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-06-06 Thread via GitHub


liuyongvs commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1629112803


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayIntersectFunction.java:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider;
+import org.apache.flink.table.runtime.util.ObjectContainer;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_INTERSECT}. */
+@Internal
+public class ArrayIntersectFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+private final EqualityAndHashcodeProvider equalityAndHashcodeProvider;
+
+public ArrayIntersectFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.ARRAY_INTERSECT, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType()
+.toInternal();
+elementGetter = 
ArrayData.createElementGetter(dataType.toInternal().getLogicalType());
+this.equalityAndHashcodeProvider = new 
EqualityAndHashcodeProvider(context, dataType);
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+equalityAndHashcodeProvider.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) {
+try {
+if (arrayOne == null || arrayTwo == null) {
+return null;
+}
+
+Set set = new HashSet<>();
+for (int pos = 0; pos < arrayTwo.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayTwo, pos);
+final ObjectContainer objectContainer = 
createObjectContainer(element);
+set.add(objectContainer);
+}
+
+Set res = new LinkedHashSet<>();

Review Comment:
if a record present in set then add it to array list otherwise add it to 
set?
   how to deduplicate?
   if array1: a, a, array2: b, a
   how to deduplicate a ,a ? @snuyanzin 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-06-06 Thread via GitHub


snuyanzin commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1629035418


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/util/ObjectContainer.java:
##
@@ -31,21 +31,25 @@
 @Internal
 public class ObjectContainer {
 
-private final Object o;
+private final Object object;

Review Comment:
   I guess we do not need it if we use just ArrayList as mentioned above
   
   Or did I miss anything?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-06-06 Thread via GitHub


snuyanzin commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1629032823


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayIntersectFunction.java:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider;
+import org.apache.flink.table.runtime.util.ObjectContainer;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_INTERSECT}. */
+@Internal
+public class ArrayIntersectFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+private final EqualityAndHashcodeProvider equalityAndHashcodeProvider;
+
+public ArrayIntersectFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.ARRAY_INTERSECT, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType()
+.toInternal();
+elementGetter = 
ArrayData.createElementGetter(dataType.toInternal().getLogicalType());
+this.equalityAndHashcodeProvider = new 
EqualityAndHashcodeProvider(context, dataType);
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+equalityAndHashcodeProvider.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) {
+try {
+if (arrayOne == null || arrayTwo == null) {
+return null;
+}
+
+Set set = new HashSet<>();
+for (int pos = 0; pos < arrayTwo.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayTwo, pos);
+final ObjectContainer objectContainer = 
createObjectContainer(element);
+set.add(objectContainer);
+}
+
+Set res = new LinkedHashSet<>();
+for (int pos = 0; pos < arrayOne.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayOne, pos);
+final ObjectContainer objectContainer = 
createObjectContainer(element);
+if (set.contains(objectContainer)) {
+res.add(objectContainer);
+}
+}
+
+return new GenericArrayData(
+res.stream()
+.map(element -> element != null ? 
element.getObject() : null)
+.toArray());

Review Comment:
   Is there anything blocking from usage of just `toArray` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-06-06 Thread via GitHub


snuyanzin commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1629030390


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayIntersectFunction.java:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider;
+import org.apache.flink.table.runtime.util.ObjectContainer;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_INTERSECT}. */
+@Internal
+public class ArrayIntersectFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+private final EqualityAndHashcodeProvider equalityAndHashcodeProvider;
+
+public ArrayIntersectFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.ARRAY_INTERSECT, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType()
+.toInternal();
+elementGetter = 
ArrayData.createElementGetter(dataType.toInternal().getLogicalType());
+this.equalityAndHashcodeProvider = new 
EqualityAndHashcodeProvider(context, dataType);
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+equalityAndHashcodeProvider.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) {
+try {
+if (arrayOne == null || arrayTwo == null) {
+return null;
+}
+
+Set set = new HashSet<>();
+for (int pos = 0; pos < arrayTwo.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayTwo, pos);
+final ObjectContainer objectContainer = 
createObjectContainer(element);
+set.add(objectContainer);
+}
+
+Set res = new LinkedHashSet<>();

Review Comment:
   nit:
   do we really need `LinkedHashSet` here?
   I guess we could use `ArrayList` to achieve same result which more space 
efficient
   like if a record present in `set` then add it to array list otherwise add it 
to `set`
   WDYT? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-06-05 Thread via GitHub


liuyongvs commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2151255284

   ci passed @snuyanzin @dawidwys @MartijnVisser will you have a look again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-06-05 Thread via GitHub


liuyongvs commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2149197172

   @MartijnVisser have rebased to fix ci 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-06-05 Thread via GitHub


MartijnVisser commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2149077614

   @liuyongvs I think you need to rebase in order to get the CI to pass


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-06-04 Thread via GitHub


liuyongvs commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2148729746

   @snuyanzin fix your review, thanks very much


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-06-04 Thread via GitHub


liuyongvs commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1626831821


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java:
##
@@ -1723,6 +1724,83 @@ private Stream arrayExceptTestCases() {
 + "ARRAY_EXCEPT(, )"));
 }
 
+private Stream arrayIntersectTestCases() {
+return Stream.of(
+
TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_INTERSECT)
+.onFieldsWithData(
+new Integer[] {1, 1, 2},
+null,
+new Row[] {Row.of(true, 1), Row.of(true, 2), 
null},
+new Integer[] {null, null, 1},
+new Map[] {
+CollectionUtil.map(entry(1, "a"), entry(2, 
"b")),
+CollectionUtil.map(entry(3, "c"), entry(4, 
"d"))
+},
+new Integer[][] {new Integer[] {1, 2, 3}})
+.andDataTypes(
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(
+DataTypes.ROW(DataTypes.BOOLEAN(), 
DataTypes.INT())),
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING())),
+
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT(
+// ARRAY
+.testResult(
+$("f0").arrayIntersect(new Integer[] {1, null, 
4}),
+"ARRAY_INTERSECT(f0, ARRAY[1, NULL, 4])",
+new Integer[] {1, 1},
+DataTypes.ARRAY(DataTypes.INT()))
+.testResult(
+$("f0").arrayIntersect(new Integer[] {3, 4}),
+"ARRAY_INTERSECT(f0, ARRAY[3, 4])",
+new Integer[] {},
+DataTypes.ARRAY(DataTypes.INT()))
+.testResult(
+$("f1").arrayIntersect(new Integer[] {1, null, 
4}),
+"ARRAY_INTERSECT(f1, ARRAY[1, NULL, 4])",
+null,
+DataTypes.ARRAY(DataTypes.INT()))
+// ARRAY>
+.testResult(
+$("f2").arrayIntersect(
+new Row[] {
+null, Row.of(true, 2),
+}),
+"ARRAY_INTERSECT(f2, ARRAY[NULL, ROW(TRUE, 
2)])",
+new Row[] {Row.of(true, 2), null},
+DataTypes.ARRAY(
+DataTypes.ROW(DataTypes.BOOLEAN(), 
DataTypes.INT(
+// arrayOne contains null elements
+.testResult(
+$("f3").arrayIntersect(new Integer[] {null, 
42}),
+"ARRAY_INTERSECT(f3, ARRAY[null, 42])",
+new Integer[] {null, null},

Review Comment:
   fixed @snuyanzin 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-06-04 Thread via GitHub


liuyongvs commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1626831685


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java:
##
@@ -1723,6 +1724,83 @@ private Stream arrayExceptTestCases() {
 + "ARRAY_EXCEPT(, )"));
 }
 
+private Stream arrayIntersectTestCases() {
+return Stream.of(
+
TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_INTERSECT)
+.onFieldsWithData(
+new Integer[] {1, 1, 2},
+null,
+new Row[] {Row.of(true, 1), Row.of(true, 2), 
null},
+new Integer[] {null, null, 1},
+new Map[] {
+CollectionUtil.map(entry(1, "a"), entry(2, 
"b")),
+CollectionUtil.map(entry(3, "c"), entry(4, 
"d"))
+},
+new Integer[][] {new Integer[] {1, 2, 3}})
+.andDataTypes(
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(
+DataTypes.ROW(DataTypes.BOOLEAN(), 
DataTypes.INT())),
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING())),
+
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT(
+// ARRAY
+.testResult(
+$("f0").arrayIntersect(new Integer[] {1, null, 
4}),
+"ARRAY_INTERSECT(f0, ARRAY[1, NULL, 4])",
+new Integer[] {1, 1},

Review Comment:
   @snuyanzin  fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-06-04 Thread via GitHub


snuyanzin commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1625447988


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java:
##
@@ -1723,6 +1724,83 @@ private Stream arrayExceptTestCases() {
 + "ARRAY_EXCEPT(, )"));
 }
 
+private Stream arrayIntersectTestCases() {
+return Stream.of(
+
TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_INTERSECT)
+.onFieldsWithData(
+new Integer[] {1, 1, 2},
+null,
+new Row[] {Row.of(true, 1), Row.of(true, 2), 
null},
+new Integer[] {null, null, 1},
+new Map[] {
+CollectionUtil.map(entry(1, "a"), entry(2, 
"b")),
+CollectionUtil.map(entry(3, "c"), entry(4, 
"d"))
+},
+new Integer[][] {new Integer[] {1, 2, 3}})
+.andDataTypes(
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(
+DataTypes.ROW(DataTypes.BOOLEAN(), 
DataTypes.INT())),
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING())),
+
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT(
+// ARRAY
+.testResult(
+$("f0").arrayIntersect(new Integer[] {1, null, 
4}),
+"ARRAY_INTERSECT(f0, ARRAY[1, NULL, 4])",
+new Integer[] {1, 1},

Review Comment:
   In docs it is mentioned 
   > Returns an ARRAY that contains the elements from array1 that are also in 
array2, without duplicates.
   
   why do we expect duplicates here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-06-04 Thread via GitHub


snuyanzin commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1625448219


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java:
##
@@ -1723,6 +1724,83 @@ private Stream arrayExceptTestCases() {
 + "ARRAY_EXCEPT(, )"));
 }
 
+private Stream arrayIntersectTestCases() {
+return Stream.of(
+
TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_INTERSECT)
+.onFieldsWithData(
+new Integer[] {1, 1, 2},
+null,
+new Row[] {Row.of(true, 1), Row.of(true, 2), 
null},
+new Integer[] {null, null, 1},
+new Map[] {
+CollectionUtil.map(entry(1, "a"), entry(2, 
"b")),
+CollectionUtil.map(entry(3, "c"), entry(4, 
"d"))
+},
+new Integer[][] {new Integer[] {1, 2, 3}})
+.andDataTypes(
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(
+DataTypes.ROW(DataTypes.BOOLEAN(), 
DataTypes.INT())),
+DataTypes.ARRAY(DataTypes.INT()),
+DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING())),
+
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT(
+// ARRAY
+.testResult(
+$("f0").arrayIntersect(new Integer[] {1, null, 
4}),
+"ARRAY_INTERSECT(f0, ARRAY[1, NULL, 4])",
+new Integer[] {1, 1},
+DataTypes.ARRAY(DataTypes.INT()))
+.testResult(
+$("f0").arrayIntersect(new Integer[] {3, 4}),
+"ARRAY_INTERSECT(f0, ARRAY[3, 4])",
+new Integer[] {},
+DataTypes.ARRAY(DataTypes.INT()))
+.testResult(
+$("f1").arrayIntersect(new Integer[] {1, null, 
4}),
+"ARRAY_INTERSECT(f1, ARRAY[1, NULL, 4])",
+null,
+DataTypes.ARRAY(DataTypes.INT()))
+// ARRAY>
+.testResult(
+$("f2").arrayIntersect(
+new Row[] {
+null, Row.of(true, 2),
+}),
+"ARRAY_INTERSECT(f2, ARRAY[NULL, ROW(TRUE, 
2)])",
+new Row[] {Row.of(true, 2), null},
+DataTypes.ARRAY(
+DataTypes.ROW(DataTypes.BOOLEAN(), 
DataTypes.INT(
+// arrayOne contains null elements
+.testResult(
+$("f3").arrayIntersect(new Integer[] {null, 
42}),
+"ARRAY_INTERSECT(f3, ARRAY[null, 42])",
+new Integer[] {null, null},

Review Comment:
   same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-26 Thread via GitHub


liuyongvs commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1615393266


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayIntersectFunction.java:
##
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider;
+import org.apache.flink.table.runtime.util.ObjectContainer;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_INTERSECT}. */
+@Internal
+public class ArrayIntersectFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+private final EqualityAndHashcodeProvider equalityAndHashcodeProvider;
+
+public ArrayIntersectFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.ARRAY_INTERSECT, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType()
+.toInternal();
+elementGetter = 
ArrayData.createElementGetter(dataType.toInternal().getLogicalType());
+this.equalityAndHashcodeProvider = new 
EqualityAndHashcodeProvider(context, dataType);
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+equalityAndHashcodeProvider.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) {
+try {
+if (arrayOne == null || arrayTwo == null) {
+return null;
+}
+
+List list = new ArrayList<>();
+Set set = new HashSet<>();
+for (int pos = 0; pos < arrayTwo.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayTwo, pos);
+final ObjectContainer objectContainer = 
createObjectContainer(element);
+set.add(objectContainer);
+}
+for (int pos = 0; pos < arrayOne.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayOne, pos);
+final ObjectContainer objectContainer = 
createObjectContainer(element);
+if (set.contains(objectContainer)) {
+list.add(element);
+}
+}
+return new GenericArrayData(list.toArray());
+} catch (Throwable t) {
+throw new FlinkRuntimeException(t);

Review Comment:
   all the builtin function implementation does same @davidradl 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-24 Thread via GitHub


davidradl commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1613069596


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayIntersectFunction.java:
##
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider;
+import org.apache.flink.table.runtime.util.ObjectContainer;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_INTERSECT}. */
+@Internal
+public class ArrayIntersectFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+private final EqualityAndHashcodeProvider equalityAndHashcodeProvider;
+
+public ArrayIntersectFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.ARRAY_INTERSECT, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType()
+.toInternal();
+elementGetter = 
ArrayData.createElementGetter(dataType.toInternal().getLogicalType());
+this.equalityAndHashcodeProvider = new 
EqualityAndHashcodeProvider(context, dataType);
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+equalityAndHashcodeProvider.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) {
+try {
+if (arrayOne == null || arrayTwo == null) {
+return null;
+}
+
+List list = new ArrayList<>();
+Set set = new HashSet<>();
+for (int pos = 0; pos < arrayTwo.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayTwo, pos);
+final ObjectContainer objectContainer = 
createObjectContainer(element);
+set.add(objectContainer);
+}
+for (int pos = 0; pos < arrayOne.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayOne, pos);
+final ObjectContainer objectContainer = 
createObjectContainer(element);
+if (set.contains(objectContainer)) {
+list.add(element);
+}
+}
+return new GenericArrayData(list.toArray());
+} catch (Throwable t) {
+throw new FlinkRuntimeException(t);

Review Comment:
   From the discussions in 
[https://github.com/apache/flink/pull/24773](https://github.com/apache/flink/pull/24773)
 I wonder if we should return null on a runtime exception. For this case null 
is returned if we cannot do an intersect, so it would make sense to return a 
null if there is a runtime error.  From your tests this will occur if the types 
do not match in array 1 or array 2 - it could be argued that there is no 
intersection in this case so return null. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-24 Thread via GitHub


davidradl commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1613069596


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayIntersectFunction.java:
##
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider;
+import org.apache.flink.table.runtime.util.ObjectContainer;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_INTERSECT}. */
+@Internal
+public class ArrayIntersectFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+private final EqualityAndHashcodeProvider equalityAndHashcodeProvider;
+
+public ArrayIntersectFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.ARRAY_INTERSECT, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType()
+.toInternal();
+elementGetter = 
ArrayData.createElementGetter(dataType.toInternal().getLogicalType());
+this.equalityAndHashcodeProvider = new 
EqualityAndHashcodeProvider(context, dataType);
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+equalityAndHashcodeProvider.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) {
+try {
+if (arrayOne == null || arrayTwo == null) {
+return null;
+}
+
+List list = new ArrayList<>();
+Set set = new HashSet<>();
+for (int pos = 0; pos < arrayTwo.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayTwo, pos);
+final ObjectContainer objectContainer = 
createObjectContainer(element);
+set.add(objectContainer);
+}
+for (int pos = 0; pos < arrayOne.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayOne, pos);
+final ObjectContainer objectContainer = 
createObjectContainer(element);
+if (set.contains(objectContainer)) {
+list.add(element);
+}
+}
+return new GenericArrayData(list.toArray());
+} catch (Throwable t) {
+throw new FlinkRuntimeException(t);

Review Comment:
   From the discussions in 
[https://github.com/apache/flink/pull/24773](https://github.com/apache/flink/pull/24773)
 I wonder if we should return null on a runtime exception. For this case null 
is returned if we cannot do an intersect, so it would make sense to return a 
null if there is a runtime error.  I there for suggest catching 
RuntimeException, logging and returning null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-24 Thread via GitHub


davidradl commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1613069596


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayIntersectFunction.java:
##
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.runtime.util.EqualityAndHashcodeProvider;
+import org.apache.flink.table.runtime.util.ObjectContainer;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_INTERSECT}. */
+@Internal
+public class ArrayIntersectFunction extends BuiltInScalarFunction {
+private final ArrayData.ElementGetter elementGetter;
+private final EqualityAndHashcodeProvider equalityAndHashcodeProvider;
+
+public ArrayIntersectFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.ARRAY_INTERSECT, context);
+final DataType dataType =
+((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+.getElementDataType()
+.toInternal();
+elementGetter = 
ArrayData.createElementGetter(dataType.toInternal().getLogicalType());
+this.equalityAndHashcodeProvider = new 
EqualityAndHashcodeProvider(context, dataType);
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+equalityAndHashcodeProvider.open(context);
+}
+
+public @Nullable ArrayData eval(ArrayData arrayOne, ArrayData arrayTwo) {
+try {
+if (arrayOne == null || arrayTwo == null) {
+return null;
+}
+
+List list = new ArrayList<>();
+Set set = new HashSet<>();
+for (int pos = 0; pos < arrayTwo.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayTwo, pos);
+final ObjectContainer objectContainer = 
createObjectContainer(element);
+set.add(objectContainer);
+}
+for (int pos = 0; pos < arrayOne.size(); pos++) {
+final Object element = 
elementGetter.getElementOrNull(arrayOne, pos);
+final ObjectContainer objectContainer = 
createObjectContainer(element);
+if (set.contains(objectContainer)) {
+list.add(element);
+}
+}
+return new GenericArrayData(list.toArray());
+} catch (Throwable t) {
+throw new FlinkRuntimeException(t);

Review Comment:
   From the discussions in 
[https://github.com/apache/flink/pull/24773](https://github.com/apache/flink/pull/24773)
 I wonder if we should return null on a runtime exception. For this case null 
seems to be what is returned if we cannot do an intersect, so it would make 
send to return a null if there is a runtime error.  I there for suggest 
catching RuntimeException, logging and returning null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-23 Thread via GitHub


liuyongvs commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1612588115


##
.idea/vcs.xml:
##
@@ -22,4 +22,4 @@
 
 
   

Review Comment:
   remoted @davidradl 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-23 Thread via GitHub


liuyongvs commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1612588115


##
.idea/vcs.xml:
##
@@ -22,4 +22,4 @@
 
 
   

Review Comment:
   remoted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-23 Thread via GitHub


liuyongvs commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1612587871


##
flink-python/pyflink/table/expression.py:
##
@@ -1618,6 +1618,15 @@ def array_except(self, array) -> 'Expression':
 """
 return _binary_op("arrayExcept")(self, array)
 
+def array_intersect(self, array) -> 'Expression':
+"""
+Returns an ARRAY that contains the elements from array1 that are also 
in array2.
+If no elements are both in array1 and array2, the function returns an 
empty ARRAY.
+If one or both arguments are NULL, the function returns NULL.
+The order of the elements from array1 is kept.

Review Comment:
   added without duplicates mention like array_union



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-23 Thread via GitHub


liuyongvs commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1612586021


##
flink-python/pyflink/table/expression.py:
##
@@ -1618,6 +1618,15 @@ def array_except(self, array) -> 'Expression':
 """
 return _binary_op("arrayExcept")(self, array)
 
+def array_intersect(self, array) -> 'Expression':
+"""
+Returns an ARRAY that contains the elements from array1 that are also 
in array2.
+If no elements are both in array1 and array2, the function returns an 
empty ARRAY.
+If one or both arguments are NULL, the function returns NULL.
+The order of the elements from array1 is kept.

Review Comment:
   added without duplicates mention like array_union



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-23 Thread via GitHub


liuyongvs commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1612586021


##
flink-python/pyflink/table/expression.py:
##
@@ -1618,6 +1618,15 @@ def array_except(self, array) -> 'Expression':
 """
 return _binary_op("arrayExcept")(self, array)
 
+def array_intersect(self, array) -> 'Expression':
+"""
+Returns an ARRAY that contains the elements from array1 that are also 
in array2.
+If no elements are both in array1 and array2, the function returns an 
empty ARRAY.
+If one or both arguments are NULL, the function returns NULL.
+The order of the elements from array1 is kept.

Review Comment:
   added without duplicates mention like array_union



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-23 Thread via GitHub


davidradl commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1611987986


##
.idea/vcs.xml:
##
@@ -22,4 +22,4 @@
 
 
   

Review Comment:
   can we remove this file from the pr



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-23 Thread via GitHub


davidradl commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1611983897


##
flink-python/pyflink/table/expression.py:
##
@@ -1618,6 +1618,15 @@ def array_except(self, array) -> 'Expression':
 """
 return _binary_op("arrayExcept")(self, array)
 
+def array_intersect(self, array) -> 'Expression':
+"""
+Returns an ARRAY that contains the elements from array1 that are also 
in array2.
+If no elements are both in array1 and array2, the function returns an 
empty ARRAY.
+If one or both arguments are NULL, the function returns NULL.
+The order of the elements from array1 is kept.

Review Comment:
   this should mention the deduplication behaviour 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-23 Thread via GitHub


davidradl commented on code in PR #24526:
URL: https://github.com/apache/flink/pull/24526#discussion_r1611982523


##
docs/data/sql_functions.yml:
##
@@ -688,6 +688,9 @@ collection:
   - sql: ARRAY_EXCEPT(array1, array2)
 table: arrayOne.arrayExcept(arrayTwo)
 description: Returns an ARRAY that contains the elements from array1 that 
are not in array2. If no elements remain after excluding the elements in array2 
from array1, the function returns an empty ARRAY. If one or both arguments are 
NULL, the function returns NULL. The order of the elements from array1 is kept.
+  - sql: ARRAY_INTERSECT(array1, array2)
+table: array1.arrayIntersect(array2)
+description: Returns an ARRAY that contains the elements from array1 that 
are also in array2. If no elements that are both in array1 and array2, the 
function returns an empty ARRAY. If any of the array is null, the function will 
return null. The order of the elements from array1 is kept.

Review Comment:
   we should mention that the result does not contain duplicates here, as per 
the agreement I see in the pr conversation 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-22 Thread via GitHub


liuyongvs commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2126150641

   fix conflicts, @twalthr @dawidwys @snuyanzin and will you help review this 
pr?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-22 Thread via GitHub


liuyongvs commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2126150525

   Conclusion:
   Since there are no objections, then we will support it with deduplication 
semantics.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-22 Thread via GitHub


liuyongvs commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2124700300

   +1 for deduplicate


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-22 Thread via GitHub


twalthr commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2124641140

   This is indeed a tricky one. I also spend a significant amount of time here. 
Every vendor does it differently. I looked also at programming languages such 
as PHP, C#. For example, C# removes duplicates.
   
   In general, having an INTERSECT on array types sounds weird. Intersection is 
a concept from set theory:
   https://en.wikipedia.org/wiki/Intersection_(set_theory)
   
   But arrays are not sets. Since we also deduplicate unions (another term from 
set theory), I would vote for deduplicate.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-22 Thread via GitHub


snuyanzin commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2124050180

   so far I see this kind of trade-off
   
   ### multi-set semantics (with duplicates)
   pros 
   1. It covers more cases  as mentioned above
   2. there is already implemented `array_except` with this semantic and it 
would make sense to follow this way
P.S. earlier I wrote why I do not consider `array_union`

   cons
1. there is only Snowflake from well known vendors who supports it

   It would be great  to see other opinions


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-22 Thread via GitHub


liuyongvs commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2123939160

   @snuyanzin Indeed, while this might only result in a difference in function 
behavior, it would generally be best to align with the practices of the 
majority of other engines. If in the future there arises a demand to preserve 
duplicates, or if other engines also adopt this behavior, we could introduce a 
switch within the function to support the option to either preserve or remove 
duplicates. What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-21 Thread via GitHub


snuyanzin commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2123898530

   >@snuyanzin @MartijnVisser Why do we necessarily have to align our semantics 
with Snowflake?
   
   as it was mentioned above the main reason is that multi-set semantics 
(Snowflake) allows to handle cases with duplicates e.g. 
   ```sql
   SELECT array_count(array_intersect(array('A', 'B', 'B'), array('B', 'B', 
'B'))); -- returns 2
   ```
   and without
   ```sql
   SELECT array_count(array_distinct(array_intersect(array('A', 'B', 'B'), 
array('B', 'B', 'B'; -- returns 1
   ```
   
   And it seems Spark and others couldn't calculate amount of duplicates with 
their semantics and I would consider it as a main drawback of their approach, 
please correct me if I'm wrong @liuyongvs 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-21 Thread via GitHub


liuyongvs commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2123732921

   @snuyanzin @MartijnVisser  Why do we necessarily have to align our semantics 
with Snowflake?
   
   i found spark/presto/doris/max_compute all follow the semantics without 
duplicate. 
   I personally still think that aligning behavior with the majority of vendors 
might be better, as it would not cause too much confusion for users when they 
switch to a different engine. Unless the behavior is very unreasonable, we 
should not need to change the semantics. What do you think?
   
   https://prestodb.io/docs/current/functions/array.html#array_intersect
   
https://doris.apache.org/docs/1.2/sql-manual/sql-functions/array-functions/array_intersect/
   https://www.alibabacloud.com/help/en/maxcompute/user-guide/array-intersect
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-21 Thread via GitHub


snuyanzin commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2122481259

   Based on what I mentioned above how about following multi-set semantics like 
Snowflake by default
   then it will allow to cover more cases (with and without duplicates)?
   
`array_union`/`array_concat` will remain as exception since it looks like 
and exception across lots of the vendors


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-21 Thread via GitHub


MartijnVisser commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2122443374

   > I failed to find standard approach for collections like arrays (I mean in 
SQL Standard).
   
   Same. I don't think it's documented anywhere. So how do we come to a 
conclusion (given that the 1.20 release date is coming to us rather quickly)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-10 Thread via GitHub


snuyanzin commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2105169563

   May be it is an unpopular opinion
   however I tend to think that `INTERSECT` vs `INTERSECT ALL` and the same for 
others for set and bag semantics is defined for rows and hardly could be 
applied for collections. I failed to find standard approach for collections 
like arrays (I mean in SQL Stadard). 
   Probably that is one of the reasons we could see a number of vendors are 
handling this differently. From one side we could say that we should follow the 
same approach as for rows. The problem I see here is that by default we will 
remove duplicates however what should we do if we want to keep them? There is 
no well known vendor providing both `array_intersect` and `array_intersect_all` 
or keep/remove duplicates  as a parameter. 
   At the same side if we keep duplicates then we will still be able to cover 
both cases:
   we can do for case with duplicates 
   ```
   array_intersect(array1, array2)
   ```
   and for case without
   ```
   array_distinct(array_intersect(array1, array2))
   ```
   
   Yes, `array_union` looks like an exception here, however if we compare 
against vendors then it is a global exception just because there is a nice 
synonym which is used for another function `array_concat`. So if we want to 
concat arrays without duplicates we use `array_union` and then `array_concat`. 
The problem is that not every function has such a workaround .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-10 Thread via GitHub


liuyongvs commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2104468195

   hi @dawidwys @snuyanzin WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-04-17 Thread via GitHub


MartijnVisser commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2060771945

   > Furthermore, array_except was merged in version 1.20, and since 1.20 is 
currently only a snapshot version and not officially released, there’s no 
concern of causing compatibility issues due to changing behaviors, so it should 
indeed be corrected. What are your thoughts?
   
   I think you're right. @dawidwys @snuyanzin WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-04-07 Thread via GitHub


liuyongvs commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2041370638

   hi @MartijnVisser 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-03-28 Thread via GitHub


liuyongvs commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2026479264

   @MartijnVisser @dawidwys @snuyanzin   I agree with you. That is to say, this 
semantic alignment with Spark's is clear, with no duplicate elements involved. 
Consequently, I believe it is also necessary to maintain consistency for 
array_union/array_except. As of now, array_union has been developed by me, with 
semantics and behavior aligned with Spark, meaning no duplicates are included. 
However, array_except  https://issues.apache.org/jira/browse/FLINK-31663 does 
include duplicates, and thus its semantics do not align; it requires 
modification. Furthermore, array_except was merged in version 1.20, and since 
1.20 is currently only a snapshot version and not officially released, there’s 
no concern of causing compatibility issues due to changing behaviors, so it 
should indeed be corrected. What are your thoughts?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-03-28 Thread via GitHub


MartijnVisser commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2025213724

   @liuyongvs I disagree: I think that we're looking at what the definition of 
INTERSECT in general is, not from a functional or implementation perspective, 
but more if there's a definition of what INTERSECT should do. I don't think 
it's a good idea to have INTERSECT in Flink that doesn't return duplicates, and 
then have an ARRAY_INTERSECT that does return duplicates. That's not 
consistent. If both INTERSECT and ARRAY_INTERSECT don't return duplicates, that 
is a consistent behavior. 
   
   So IMHO:
   
   INTERSECT and ARRAY_INTERSECT --> Removes duplicates
   
   If there's a need to have duplicates included:
   
   INTERSECT ALL and ARRAY_INTERSECT_ALL --> Keep duplicates, have consistent 
behavior with INTERSECT ALL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-03-27 Thread via GitHub


liuyongvs commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2024262258

   hi @MartijnVisser INTERSECT is different with array_intersect. instersect is 
a set RelNode, like union/union all, which indeed is a  SQL standard.. while 
array_intersect array_union are just functions, which doesn't have any standard


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-03-27 Thread via GitHub


MartijnVisser commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2022804099

   > What is your opinion on how the function should behave?
   
   I've taken a look at how INTERSECT is defined in the SQL standard. Based on 
https://stackoverflow.com/questions/59060599/does-intersect-operator-exist-in-the-sql-standar,
  https://www.postgresql.org/docs/current/queries-union.html, the fact that 
Calcite differentiates between INTERSECT and INTERSECT ALL leads me to believe 
that the default behavior of INTERSECT is to remove duplicates. 
   
   So the result of INTERSECT on `[1, 1, 1, 2] INTERSECT [1, 1, 2]` should be 
`[1, 2]` in my understanding. I think that Spark/Databricks/Presto are 
performing the correct behavior. 
   
   BigQuery and Redshift don't support ARRAY_INTERSECT. ksqlDB follows the same 
behavior as Spark/Databricks/Presto per 
https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/scalar-functions/#array_intersect.
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-03-27 Thread via GitHub


liuyongvs commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2022537218

   hi @snuyanzin @dawidwys what is your opinion?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-03-25 Thread via GitHub


liuyongvs commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2017314639

   > 
https://clickhouse.com/docs/en/sql-reference/functions/array-functions#arrayintersectarr
   
   from my side, it is not a good idea.
   because we can use array_intersect(array_intersect(array1, array2), array3) 
does same. it is just a syntatic sugar.
   and array_union/array_except has supported and merged. there are both two 
args. we may align , what is your opinion @dawidwys @MartijnVisser 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-03-22 Thread via GitHub


snuyanzin commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2014912924

   one more idea
   some vendors allow to calculate intersections for arbitrary amount of arrays 
e.g.
   Clickhouse[1]
   [1] 
https://clickhouse.com/docs/en/sql-reference/functions/array-functions#arrayintersectarr


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-03-22 Thread via GitHub


dawidwys commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2014857862

   Before I review the code let's settle on the behaviour first.
   
   @MartijnVisser What is your opinion on how the function should behave? 
Especially in the context of 
https://github.com/apache/flink/pull/23173#discussion_r1491044219 and handling 
duplicates.
   
   What should be the output of: `[1, 1, 1, 2] INTERSECT [1, 1, 2]`?
   1. [1,2] - Spark/Databricks/Presto
   2. [1,1,2] - Snowflake
   3. [1, 1, 1, 2] - (as far as I can tell the current behaviour of the PR)
   
   * 
[Snowflake](https://docs.snowflake.com/en/sql-reference/functions/array_intersection#usage-notes)
 has multi-set semantics.
   > If one array has N copies of a value, and the other array has M copies of 
the same value, then the number of copies in the returned array is the smaller 
of N or M. For example, if N is 4 and M is 2, then the returned value contains 
2 copies.
   
   * 
[Databricks](https://docs.databricks.com/en/sql/language-manual/functions/array_intersect.html#returns)
 deduplicates result 
([Spark](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.array_intersect.html)
 I presume has the same behaviour)
   > An ARRAY of matching type to array1 with no duplicates and elements 
contained in both array1 and array2.
   
   * 
[Presto](https://prestodb.io/docs/current/functions/array.html#array_intersect) 
does the same as Spark:
   > Returns an array of the elements in the intersection of x and y, without 
duplicates.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-03-19 Thread via GitHub


liuyongvs commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2007045444

   hi @dawidwys will you help review this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-03-19 Thread via GitHub


flinkbot commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2006346388

   
   ## CI report:
   
   * 55306725ea542f506a4938e2d437e59687dbec13 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-03-19 Thread via GitHub


liuyongvs commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2006330781

   after discussion with @dawidwys  here 
https://github.com/apache/flink/pull/23171#issuecomment-1956501651


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-03-19 Thread via GitHub


liuyongvs opened a new pull request, #24526:
URL: https://github.com/apache/flink/pull/24526

   - What is the purpose of the change
   This is an implementation of ARRAY_INTERSECT
   
   - Brief change log
   ARRAY_INTERSECT for Table API and SQL
   
   ```
   Returns an array of the elements in the intersection of array1 and array2, 
with duplicates.
   
   Syntax:
   array_intersect(array1, array2)
   
   Arguments:
   array: An ARRAY to be handled.
   
   Returns:
   An ARRAY. If any of the array is null, the function will return null.
   Examples:
   
   > SELECT array_intersect(array(1, 1, 3), array(1, 3, 5));
[1,1,3]
   
   ```
   
   - Verifying this change
   This change added tests in CollectionFunctionsITCase.
   
   - Does this pull request potentially affect one of the following parts:
   Dependencies (does it add or upgrade a dependency): ( no)
   The public API, i.e., is any changed class annotated with @Public(Evolving): 
(yes )
   The serializers: (no)
   The runtime per-record code paths (performance sensitive): ( no)
   Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no)
   The S3 file system connector: ( no)
   - Documentation
   Does this pull request introduce a new feature? (yes)
   If yes, how is the feature documented? (docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org