yuxiqian commented on code in PR #4241:
URL: https://github.com/apache/flink-cdc/pull/4241#discussion_r2753288316
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java:
##########
@@ -398,6 +398,12 @@ public SqlSyntax getSyntax() {
// --------------
public static final SqlFunction CAST = SqlStdOperatorTable.CAST;
+ // ---------------------
+ // Collection Functions
+ // ---------------------
+ // Supports array[index] and map[key] syntax
+ public static final SqlOperator ITEM = SqlStdOperatorTable.ITEM;
Review Comment:
Please update documentations for this.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java:
##########
@@ -468,6 +470,37 @@ private static Java.Rvalue generateOtherOperation(
throw new ParseException("Unrecognized expression: " +
sqlBasicCall.toString());
}
+ private static Java.Rvalue generateItemAccessOperation(
+ Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
+ if (atoms.length != 2) {
+ throw new ParseException("Unrecognized item access expression: " +
sqlBasicCall);
+ }
+ Java.Rvalue methodInvocation =
+ new Java.MethodInvocation(Location.NOWHERE, null,
"itemAccess", atoms);
+
+ // Deduce the return type and add a cast to ensure proper type
conversion
+ DataType resultType =
+ TransformParser.deduceSubExpressionType(
+ context.columns,
+ sqlBasicCall,
+ context.udfDescriptors,
+ context.supportedMetadataColumns);
+
+ // Get the Java class for the result type and add a cast
+ Class<?> javaClass = JavaClassConverter.toJavaClass(resultType);
+ if (javaClass != null && javaClass != Object.class) {
+ return new Java.Cast(
+ Location.NOWHERE,
+ new Java.ReferenceType(
+ Location.NOWHERE,
+ new Java.Annotation[0],
+ javaClass.getName().split("\\."),
Review Comment:
Shall we use `getCanonicalName()` here? Adding a test to index `ARRAY<BYTE>`
might be able to verify this.
```
jshell> byte[].class.getName()
$1 ==> "[B"
jshell> byte[].class.getCanonicalName()
$2 ==> "byte[]"
```
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.functions.impl;
+
+import java.util.List;
+import java.util.Map;
+
+/** Struct built-in functions. */
+public class StructFunctions {
+
+ /**
+ * Accesses an element from an ARRAY or MAP by index or key.
+ *
+ * <p>For ARRAY: Uses 1-based index (SQL standard). array[1] returns the
first element.
+ *
+ * <p>For MAP: Uses key to access the value. map['key'] returns the value
for 'key'.
+ */
+ public static Object itemAccess(Object collection, Object indexOrKey) {
+ if (collection == null || indexOrKey == null) {
+ return null;
+ }
+
+ Object result;
+ if (collection instanceof List) {
+ result = arrayElement((List) collection, indexOrKey);
+ } else if (collection instanceof Map) {
+ result = mapValue((Map<?, ?>) collection, indexOrKey);
+ } else {
+ throw new IllegalArgumentException(
+ "itemAccess only supports List or Map, but got: "
+ + collection.getClass().getName());
+ }
+ return result;
+ }
+
+ /**
+ * Gets an element from an Object array by index (1-based, SQL standard).
This overload handles
+ * arrays that have been converted from ArrayData to Object[] by
DataTypeConverter.
+ *
+ * @param array the Object array to access
+ * @param index the 1-based index
+ * @return the element at the specified index, or null if index is out of
bounds
+ */
+ public static Object arrayElement(List array, Object index) {
+ if (array == null || index == null) {
+ return null;
+ }
+
+ int idx;
+ if (index instanceof Number) {
+ idx = ((Number) index).intValue();
+ } else {
+ idx = Integer.parseInt(index.toString());
+ }
+
+ // Convert 1-based index to 0-based (SQL standard uses 1-based
indexing)
+ int zeroBasedIndex = idx - 1;
+
+ // Check bounds
+ if (zeroBasedIndex < 0 || zeroBasedIndex >= array.size()) {
+ return null;
+ }
+
+ return array.get(zeroBasedIndex);
+ }
+
+ /**
+ * Gets a value from a Map by key.
+ *
+ * @param map the Map to access
+ * @param key the key to look up
+ * @return the value for the specified key, or null if not found
+ */
+ public static Object mapValue(Map<?, ?> map, Object key) {
Review Comment:
As Yanquan has introduced Variant support in pipeline, shall we add similar
item subscription methods for `Variant` data, too?
```java
public static Variant itemSubscript(Variant variant, Integer index);
public static Variant itemSubscript(Variant variant, String key);
```
It's fine if you prefer supporting it in another ticket.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java:
##########
@@ -93,7 +93,7 @@
public class TransformParser {
private static final Logger LOG =
LoggerFactory.getLogger(TransformParser.class);
private static final String DEFAULT_SCHEMA = "default_schema";
- private static final String DEFAULT_TABLE = "TB";
+ public static final String DEFAULT_TABLE = "TB";
Review Comment:
No longer necessary now?
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.functions.impl;
+
+import java.util.List;
+import java.util.Map;
+
+/** Struct built-in functions. */
+public class StructFunctions {
+
+ /**
+ * Accesses an element from an ARRAY or MAP by index or key.
+ *
+ * <p>For ARRAY: Uses 1-based index (SQL standard). array[1] returns the
first element.
+ *
+ * <p>For MAP: Uses key to access the value. map['key'] returns the value
for 'key'.
+ */
+ public static Object itemAccess(Object collection, Object indexOrKey) {
+ if (collection == null || indexOrKey == null) {
+ return null;
+ }
+
+ Object result;
+ if (collection instanceof List) {
+ result = arrayElement((List) collection, indexOrKey);
+ } else if (collection instanceof Map) {
+ result = mapValue((Map<?, ?>) collection, indexOrKey);
Review Comment:
It seems we can declare item subscription functions like this:
```java
public static <T> T itemAccess(List<T> array, Integer index);
public static <K, V> V itemAccess(Map<K, V> map, K key);
```
And let the Java overloading rule decide the correct method instead of
dynamic dispatching.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java:
##########
@@ -468,6 +470,37 @@ private static Java.Rvalue generateOtherOperation(
throw new ParseException("Unrecognized expression: " +
sqlBasicCall.toString());
}
+ private static Java.Rvalue generateItemAccessOperation(
+ Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
+ if (atoms.length != 2) {
+ throw new ParseException("Unrecognized item access expression: " +
sqlBasicCall);
+ }
Review Comment:
```suggestion
Preconditions.checkArgument(
atoms.length == 2,
"Expecting item accessing call %s to have 2 operands, got %s
actually",
sqlBasicCall,
List.of(atoms));
```
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java:
##########
@@ -398,6 +398,12 @@ public SqlSyntax getSyntax() {
// --------------
public static final SqlFunction CAST = SqlStdOperatorTable.CAST;
+ // ---------------------
+ // Collection Functions
+ // ---------------------
+ // Supports array[index] and map[key] syntax
Review Comment:
```suggestion
// Supports accessing elements of ARRAY[index], ROW[index] and MAP[key]
```
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java:
##########
@@ -398,6 +398,12 @@ public SqlSyntax getSyntax() {
// --------------
public static final SqlFunction CAST = SqlStdOperatorTable.CAST;
+ // ---------------------
+ // Collection Functions
Review Comment:
It would be nice to use a unified name, either "collection functions" or
"struct functions"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]