AlanConfluent commented on code in PR #26567:
URL: https://github.com/apache/flink/pull/26567#discussion_r2110383237
##########
docs/layouts/shortcodes/generated/sink_configuration.html:
##########
@@ -0,0 +1,18 @@
+<table class="configuration table table-bordered">
Review Comment:
Yes, it is unrelated. Not sure why it was generated. Let me remove it.
##########
flink-table/flink-table-common/src/test/java/org/apache/flink/table/functions/UserDefinedFunctionHelperTest.java:
##########
@@ -295,32 +327,69 @@ public static class ValidAsyncScalarFunction extends
AsyncScalarFunction {
public void eval(CompletableFuture<Integer> future, int i) {}
}
+ /** Valid table function. */
+ public static class ValidAsyncTableFunction extends
AsyncTableFunction<Integer> {
+ public void eval(CompletableFuture<Collection<Integer>> future, int i)
{}
+ }
+
private static class PrivateAsyncScalarFunction extends
AsyncScalarFunction {
public void eval(CompletableFuture<Integer> future, int i) {}
}
+ private static class PrivateAsyncTableFunction extends
AsyncTableFunction<Integer> {
+ public void eval(CompletableFuture<Collection<Integer>> future, int i)
{}
+ }
+
/** No implementation method. */
public static class MissingImplementationAsyncScalarFunction extends
AsyncScalarFunction {
// nothing to do
}
+ /** No implementation method. */
+ public static class MissingImplementationAsyncTableFunction
+ extends AsyncTableFunction<Integer> {
+ // nothing to do
+ }
+
/** Implementation method is private. */
public static class PrivateMethodAsyncScalarFunction extends
AsyncScalarFunction {
private void eval(CompletableFuture<Integer> future, int i) {}
}
+ /** Implementation method is private. */
+ public static class PrivateMethodAsyncTableFunction extends
AsyncTableFunction<Integer> {
+ private void eval(CompletableFuture<Collection<Integer>> future, int
i) {}
+ }
+
/** Implementation method isn't void. */
public static class NonVoidAsyncScalarFunction extends AsyncScalarFunction
{
public String eval(CompletableFuture<Integer> future, int i) {
return "";
}
}
+ /** Implementation method isn't void. */
+ public static class NonVoidAsyncTableFunction extends AsyncScalarFunction {
+ public String eval(CompletableFuture<Collection<Integer>> future, int
i) {
+ return "";
+ }
+ }
+
/** Implementation method isn't void. */
public static class NoFutureAsyncScalarFunction extends
AsyncScalarFunction {
public void eval(int i) {}
}
+ /** Implementation method isn't void. */
Review Comment:
Updated comment to be correct
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -443,6 +443,49 @@ public class ExecutionConfigOptions {
"The max number of async retry attempts to make
before task "
+ "execution is failed.");
+ // ------------------------------------------------------------------------
+ // Async Table Function
+ // ------------------------------------------------------------------------
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<Integer>
TABLE_EXEC_ASYNC_TABLE_BUFFER_CAPACITY =
+ key("table.exec.async-table.buffer-capacity")
+ .intType()
+ .defaultValue(10)
+ .withDescription(
+ "The max number of async i/o operations that the
async table function can trigger.");
+
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<Duration> TABLE_EXEC_ASYNC_TABLE_TIMEOUT =
+ key("table.exec.async-table.timeout")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(3))
+ .withDescription(
+ "The async timeout for the asynchronous operation
to complete.");
Review Comment:
It's for all retries to complete. Let me update the description to be
clearer.
##########
flink-table/flink-table-common/src/test/java/org/apache/flink/table/functions/UserDefinedFunctionHelperTest.java:
##########
@@ -295,32 +327,69 @@ public static class ValidAsyncScalarFunction extends
AsyncScalarFunction {
public void eval(CompletableFuture<Integer> future, int i) {}
}
+ /** Valid table function. */
+ public static class ValidAsyncTableFunction extends
AsyncTableFunction<Integer> {
+ public void eval(CompletableFuture<Collection<Integer>> future, int i)
{}
+ }
+
private static class PrivateAsyncScalarFunction extends
AsyncScalarFunction {
public void eval(CompletableFuture<Integer> future, int i) {}
}
+ private static class PrivateAsyncTableFunction extends
AsyncTableFunction<Integer> {
+ public void eval(CompletableFuture<Collection<Integer>> future, int i)
{}
+ }
+
/** No implementation method. */
public static class MissingImplementationAsyncScalarFunction extends
AsyncScalarFunction {
// nothing to do
}
+ /** No implementation method. */
+ public static class MissingImplementationAsyncTableFunction
+ extends AsyncTableFunction<Integer> {
+ // nothing to do
+ }
+
/** Implementation method is private. */
public static class PrivateMethodAsyncScalarFunction extends
AsyncScalarFunction {
private void eval(CompletableFuture<Integer> future, int i) {}
}
+ /** Implementation method is private. */
+ public static class PrivateMethodAsyncTableFunction extends
AsyncTableFunction<Integer> {
+ private void eval(CompletableFuture<Collection<Integer>> future, int
i) {}
+ }
+
/** Implementation method isn't void. */
public static class NonVoidAsyncScalarFunction extends AsyncScalarFunction
{
public String eval(CompletableFuture<Integer> future, int i) {
return "";
}
}
+ /** Implementation method isn't void. */
+ public static class NonVoidAsyncTableFunction extends AsyncScalarFunction {
+ public String eval(CompletableFuture<Collection<Integer>> future, int
i) {
+ return "";
+ }
+ }
+
/** Implementation method isn't void. */
public static class NoFutureAsyncScalarFunction extends
AsyncScalarFunction {
public void eval(int i) {}
}
+ /** Implementation method isn't void. */
+ public static class NoFutureAsyncTableFunction extends
AsyncTableFunction<Integer> {
+ public void eval(int i) {}
+ }
+
+ /** Implementation method isn't void. */
Review Comment:
Updated comment to be correct
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRule.java:
##########
@@ -71,24 +72,35 @@ public class AsyncCalcSplitRule {
* org.apache.flink.table.functions.AsyncScalarFunction}.
*/
public static class AsyncRemoteCalcCallFinder implements
RemoteCalcCallFinder {
+
+ private final FunctionKind functionKind;
+
+ public AsyncRemoteCalcCallFinder() {
+ this(FunctionKind.ASYNC_SCALAR);
+ }
Review Comment:
Good point. Removed the default constructor and replaced the existing with
the explicit scalar constructor.
##########
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java:
##########
@@ -373,4 +374,28 @@ public static void validateLambdaType(Class<?> baseClass,
Type t) {
+ "Otherwise the type has to be specified
explicitly using type information.");
}
}
+
+ /**
+ * Will return true if the type of the given generic class type.
Review Comment:
Done
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRule.java:
##########
@@ -71,24 +72,35 @@ public class AsyncCalcSplitRule {
* org.apache.flink.table.functions.AsyncScalarFunction}.
Review Comment:
Good call. Updated.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecAsyncCorrelate.java:
##########
@@ -0,0 +1,93 @@
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCorrelate;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.types.logical.RowType;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCorrelate.ASYNC_CORRELATE_TRANSFORMATION;
+
+/**
+ * Stream {@link ExecNode} which matches along with join a Java/Scala user
defined table function.
+ */
+@ExecNodeMetadata(
+ name = "stream-exec-async-correlate",
+ version = 1,
+ producedTransformations = ASYNC_CORRELATE_TRANSFORMATION,
+ minPlanVersion = FlinkVersion.v1_19,
+ minStateVersion = FlinkVersion.v1_19)
Review Comment:
Ah, good catch. Updated to be V2_1, which I believe will include this
change.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.java:
##########
@@ -82,4 +90,31 @@ public void testCorrelateIndirectOtherWay() {
String sqlQuery = "select * FROM MyTable, LATERAL
TABLE(tableFunc(func1(ABS(a))))";
util.verifyRelPlan(sqlQuery);
}
+
+ @Test
+ public void testCorrelateWithSystem() {
+ String sqlQuery = "select * FROM MyTable, LATERAL
TABLE(asyncTableFunc(ABS(a)))";
+ util.verifyRelPlan(sqlQuery);
+ }
+
+ @Test
+ public void testCorrelateWithScalar() {
+ String sqlQuery = "select * FROM MyTable, LATERAL
TABLE(asyncTableFunc(scalar(a)))";
+ util.verifyRelPlan(sqlQuery);
+ }
+
Review Comment:
Added that test and the fix.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]