lihaosky commented on code in PR #26567:
URL: https://github.com/apache/flink/pull/26567#discussion_r2103359338
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -443,6 +443,49 @@ public class ExecutionConfigOptions {
"The max number of async retry attempts to make
before task "
+ "execution is failed.");
+ // ------------------------------------------------------------------------
+ // Async Table Function
+ // ------------------------------------------------------------------------
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<Integer>
TABLE_EXEC_ASYNC_TABLE_BUFFER_CAPACITY =
+ key("table.exec.async-table.buffer-capacity")
+ .intType()
+ .defaultValue(10)
+ .withDescription(
+ "The max number of async i/o operations that the
async table function can trigger.");
+
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<Duration> TABLE_EXEC_ASYNC_TABLE_TIMEOUT =
+ key("table.exec.async-table.timeout")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(3))
+ .withDescription(
+ "The async timeout for the asynchronous operation
to complete.");
Review Comment:
Is this the timeout for each retry or total timeout for all retries? I
suppose it's for each retry
##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java:
##########
@@ -373,4 +374,28 @@ public static void validateLambdaType(Class<?> baseClass,
Type t) {
+ "Otherwise the type has to be specified
explicitly using type information.");
}
}
+
+ /**
+ * Will return true if the type of the given generic class type.
+ *
+ * @param clazz The generic class to check against
+ * @param type The type to be checked
+ */
+ public static boolean isGenericOfClass(Class<?> clazz, Type type) {
+ Optional<ParameterizedType> parameterized = getParameterizedType(type);
+ return clazz.equals(type)
+ || parameterized.isPresent() &&
clazz.equals(parameterized.get().getRawType());
+ }
+
+ /**
+ * Returns an optional of a ParameterizedType, if that's what the type is.
+ *
+ * @param type The type to check
+ * @return optional which is present if the type is a ParameterizedType
+ */
+ public static Optional<ParameterizedType> getParameterizedType(Type type) {
Review Comment:
Add unit test for these two function?
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.java:
##########
@@ -82,4 +90,31 @@ public void testCorrelateIndirectOtherWay() {
String sqlQuery = "select * FROM MyTable, LATERAL
TABLE(tableFunc(func1(ABS(a))))";
util.verifyRelPlan(sqlQuery);
}
+
+ @Test
+ public void testCorrelateWithSystem() {
+ String sqlQuery = "select * FROM MyTable, LATERAL
TABLE(asyncTableFunc(ABS(a)))";
+ util.verifyRelPlan(sqlQuery);
+ }
+
+ @Test
+ public void testCorrelateWithScalar() {
+ String sqlQuery = "select * FROM MyTable, LATERAL
TABLE(asyncTableFunc(scalar(a)))";
+ util.verifyRelPlan(sqlQuery);
+ }
+
Review Comment:
Can you add
```
@Test
public void testCorrelateWithCast() {
String sqlQuery =
"select * FROM MyTable, LATERAL
TABLE(asyncTableFunc(cast(cast(a as int) as int)))";
util.verifyRelPlan(sqlQuery);
}
```
There was a fix I made internally around cast
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRule.java:
##########
@@ -71,24 +72,35 @@ public class AsyncCalcSplitRule {
* org.apache.flink.table.functions.AsyncScalarFunction}.
Review Comment:
as well as `AsyncTableFunction`?
##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java:
##########
@@ -373,4 +374,28 @@ public static void validateLambdaType(Class<?> baseClass,
Type t) {
+ "Otherwise the type has to be specified
explicitly using type information.");
}
}
+
+ /**
+ * Will return true if the type of the given generic class type.
Review Comment:
```suggestion
* Will return true if the type of the given generic class type matches
clazz.
```
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecAsyncCorrelate.java:
##########
@@ -0,0 +1,93 @@
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCorrelate;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.types.logical.RowType;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCorrelate.ASYNC_CORRELATE_TRANSFORMATION;
+
+/**
+ * Stream {@link ExecNode} which matches along with join a Java/Scala user
defined table function.
+ */
+@ExecNodeMetadata(
+ name = "stream-exec-async-correlate",
+ version = 1,
+ producedTransformations = ASYNC_CORRELATE_TRANSFORMATION,
+ minPlanVersion = FlinkVersion.v1_19,
+ minStateVersion = FlinkVersion.v1_19)
Review Comment:
Not exist yet in 1.19?
##########
flink-table/flink-table-common/src/test/java/org/apache/flink/table/functions/UserDefinedFunctionHelperTest.java:
##########
@@ -295,32 +327,69 @@ public static class ValidAsyncScalarFunction extends
AsyncScalarFunction {
public void eval(CompletableFuture<Integer> future, int i) {}
}
+ /** Valid table function. */
+ public static class ValidAsyncTableFunction extends
AsyncTableFunction<Integer> {
+ public void eval(CompletableFuture<Collection<Integer>> future, int i)
{}
+ }
+
private static class PrivateAsyncScalarFunction extends
AsyncScalarFunction {
public void eval(CompletableFuture<Integer> future, int i) {}
}
+ private static class PrivateAsyncTableFunction extends
AsyncTableFunction<Integer> {
+ public void eval(CompletableFuture<Collection<Integer>> future, int i)
{}
+ }
+
/** No implementation method. */
public static class MissingImplementationAsyncScalarFunction extends
AsyncScalarFunction {
// nothing to do
}
+ /** No implementation method. */
+ public static class MissingImplementationAsyncTableFunction
+ extends AsyncTableFunction<Integer> {
+ // nothing to do
+ }
+
/** Implementation method is private. */
public static class PrivateMethodAsyncScalarFunction extends
AsyncScalarFunction {
private void eval(CompletableFuture<Integer> future, int i) {}
}
+ /** Implementation method is private. */
+ public static class PrivateMethodAsyncTableFunction extends
AsyncTableFunction<Integer> {
+ private void eval(CompletableFuture<Collection<Integer>> future, int
i) {}
+ }
+
/** Implementation method isn't void. */
public static class NonVoidAsyncScalarFunction extends AsyncScalarFunction
{
public String eval(CompletableFuture<Integer> future, int i) {
return "";
}
}
+ /** Implementation method isn't void. */
+ public static class NonVoidAsyncTableFunction extends AsyncScalarFunction {
+ public String eval(CompletableFuture<Collection<Integer>> future, int
i) {
+ return "";
+ }
+ }
+
/** Implementation method isn't void. */
public static class NoFutureAsyncScalarFunction extends
AsyncScalarFunction {
public void eval(int i) {}
}
+ /** Implementation method isn't void. */
Review Comment:
first param isn't `CompletableFuture<Collection>`?
##########
flink-table/flink-table-common/src/test/java/org/apache/flink/table/functions/UserDefinedFunctionHelperTest.java:
##########
@@ -295,32 +327,69 @@ public static class ValidAsyncScalarFunction extends
AsyncScalarFunction {
public void eval(CompletableFuture<Integer> future, int i) {}
}
+ /** Valid table function. */
+ public static class ValidAsyncTableFunction extends
AsyncTableFunction<Integer> {
+ public void eval(CompletableFuture<Collection<Integer>> future, int i)
{}
+ }
+
private static class PrivateAsyncScalarFunction extends
AsyncScalarFunction {
public void eval(CompletableFuture<Integer> future, int i) {}
}
+ private static class PrivateAsyncTableFunction extends
AsyncTableFunction<Integer> {
+ public void eval(CompletableFuture<Collection<Integer>> future, int i)
{}
+ }
+
/** No implementation method. */
public static class MissingImplementationAsyncScalarFunction extends
AsyncScalarFunction {
// nothing to do
}
+ /** No implementation method. */
+ public static class MissingImplementationAsyncTableFunction
+ extends AsyncTableFunction<Integer> {
+ // nothing to do
+ }
+
/** Implementation method is private. */
public static class PrivateMethodAsyncScalarFunction extends
AsyncScalarFunction {
private void eval(CompletableFuture<Integer> future, int i) {}
}
+ /** Implementation method is private. */
+ public static class PrivateMethodAsyncTableFunction extends
AsyncTableFunction<Integer> {
+ private void eval(CompletableFuture<Collection<Integer>> future, int
i) {}
+ }
+
/** Implementation method isn't void. */
public static class NonVoidAsyncScalarFunction extends AsyncScalarFunction
{
public String eval(CompletableFuture<Integer> future, int i) {
return "";
}
}
+ /** Implementation method isn't void. */
+ public static class NonVoidAsyncTableFunction extends AsyncScalarFunction {
+ public String eval(CompletableFuture<Collection<Integer>> future, int
i) {
+ return "";
+ }
+ }
+
/** Implementation method isn't void. */
public static class NoFutureAsyncScalarFunction extends
AsyncScalarFunction {
public void eval(int i) {}
}
+ /** Implementation method isn't void. */
+ public static class NoFutureAsyncTableFunction extends
AsyncTableFunction<Integer> {
+ public void eval(int i) {}
+ }
+
+ /** Implementation method isn't void. */
Review Comment:
ditto
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRule.java:
##########
@@ -71,24 +72,35 @@ public class AsyncCalcSplitRule {
* org.apache.flink.table.functions.AsyncScalarFunction}.
*/
public static class AsyncRemoteCalcCallFinder implements
RemoteCalcCallFinder {
+
+ private final FunctionKind functionKind;
+
+ public AsyncRemoteCalcCallFinder() {
+ this(FunctionKind.ASYNC_SCALAR);
+ }
Review Comment:
nit: Not sure if we want to default to scalar function given both are
supported now. Requiring functionKind is probably fine
##########
docs/layouts/shortcodes/generated/sink_configuration.html:
##########
@@ -0,0 +1,18 @@
+<table class="configuration table table-bordered">
Review Comment:
This is unrelated the async table function?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]