This is an automated email from the ASF dual-hosted git repository.
martijnvisser pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.0 by this push:
new 702a985e993 [FLINK-35117][Table SQL/Planner] Add commons-text to
allowed classloader list. This closes #26390 (#26401)
702a985e993 is described below
commit 702a985e993fcd926dea28e42dc17d29ce09ab63
Author: Martijn Visser <[email protected]>
AuthorDate: Sat Apr 5 10:56:34 2025 +0200
[FLINK-35117][Table SQL/Planner] Add commons-text to allowed classloader
list. This closes #26390 (#26401)
* [FLINK-35117][Table SQL/Planner] Add commons-text to allowed classloader
list
This dependency is already in the main POM, like commons-lang3 and
commons-math3
* [FLINK-35117][Table SQL/Planner] Add E2E test
(cherry picked from commit bec57c16c719e2bb0045757a183e114691eca0e7)
---
.../flink-end-to-end-tests-table-api/pom.xml | 107 +++++++++++
.../test/async/AsyncScalarFunctionExample.java | 205 +++++++++++++++++++++
.../table/test/async/AsyncScalarFunctionTest.java | 39 ++++
flink-end-to-end-tests/pom.xml | 2 +
.../flink/table/planner/loader/PlannerModule.java | 1 +
5 files changed, 354 insertions(+)
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/pom.xml
b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/pom.xml
new file mode 100644
index 00000000000..17bc0957959
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/pom.xml
@@ -0,0 +1,107 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>2.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-end-to-end-tests-table-api</artifactId>
+ <name>Flink : E2E Tests : Async Function Table API</name>
+
+ <dependencies>
+ <!-- Flink dependencies -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Flink Table API -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-loader</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Logging -->
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+
+ <!-- Test Dependencies -->
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <version>${junit5.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <version>${junit5.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Flink Test Utils -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+</project>
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/async/AsyncScalarFunctionExample.java
b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/async/AsyncScalarFunctionExample.java
new file mode 100644
index 00000000000..312b17e64c7
--- /dev/null
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/async/AsyncScalarFunctionExample.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.async;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableDescriptor;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.functions.AsyncScalarFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
+
+/** Example application that tests AsyncScalarFunction with Table API. */
+public class AsyncScalarFunctionExample {
+ private static final Logger LOG =
LoggerFactory.getLogger(AsyncScalarFunctionExample.class);
+
+ /**
+ * Sets up and executes the Flink processing pipeline.
+ *
+ * @return TableResult from the execution
+ */
+ public TableResult execute() throws Exception {
+ LOG.info("Starting AsyncScalarFunctionExample");
+
+ // Set up Table Environment
+ EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
+ TableEnvironment tableEnv = TableEnvironment.create(settings);
+
+ // Register source table
+ createSourceTable(tableEnv);
+
+ // Register sink table
+ createSinkTable(tableEnv);
+
+ // Register the lookup function
+ registerLookupFunction(tableEnv);
+
+ // Create and execute the transformation using Table API
+ TableResult result = processProducts(tableEnv);
+
+ LOG.info("Executing Flink job");
+
+ return result;
+ }
+
+ /** Creates the source table using datagen connector. */
+ protected void createSourceTable(TableEnvironment tableEnv) {
+ final Schema schema =
+ Schema.newBuilder()
+ .column("product_id", DataTypes.STRING())
+ .column("price", DataTypes.DECIMAL(10, 2))
+ .column("quantity", DataTypes.INT())
+ .column("ts", DataTypes.TIMESTAMP(3))
+ .watermark("ts", "ts - INTERVAL '5' SECOND")
+ .build();
+
+ // Create a temporary mock products table
+ tableEnv.createTemporaryTable(
+ "products",
+ TableDescriptor.forConnector("datagen")
+ .schema(schema)
+ .option("number-of-rows", "10")
+ .option("fields.product_id.kind", "sequence")
+ .option("fields.product_id.start", "1")
+ .option("fields.product_id.end", "10")
+ .option("fields.price.min", "10.00")
+ .option("fields.price.max", "100.00")
+ .option("fields.quantity.min", "1")
+ .option("fields.quantity.max", "10")
+ .build());
+
+ LOG.info("Source table 'products' created");
+ }
+
+ /** Creates the sink table using blackhole connector. */
+ protected void createSinkTable(TableEnvironment tableEnv) {
+ tableEnv.createTemporaryTable(
+ "enriched_products",
+ TableDescriptor.forConnector("blackhole")
+ .schema(
+ Schema.newBuilder()
+ .column("product_id",
DataTypes.STRING())
+ .column("price", DataTypes.DECIMAL(10,
2))
+ .column("quantity", DataTypes.INT())
+ .column("timestamp",
DataTypes.TIMESTAMP(3))
+ .column("name", DataTypes.STRING())
+ .build())
+ .build());
+
+ LOG.info("Sink table 'enriched_products' created");
+ }
+
+ /** Registers the async lookup function. */
+ protected void registerLookupFunction(TableEnvironment tableEnv) {
+ tableEnv.createTemporaryFunction("lookup_name", new
MockAsyncLookupFunction("name"));
+ LOG.info("Registered async lookup function");
+ }
+
+ /**
+ * Processes products data using Table API with async lookups. - Performs
async lookups -
+ * Enriches the data with product details - Inserts into the sink table
+ *
+ * @return TableResult from the execution
+ */
+ protected TableResult processProducts(TableEnvironment tableEnv) {
+ Table products = tableEnv.from("products");
+ Table enrichedProducts =
+ products.select(
+ $("product_id"),
+ $("price"),
+ $("quantity"),
+ $("ts").as("timestamp"),
+ call("lookup_name", $("product_id")).as("name"));
+
+ // Execute the query - will fail with NoClassDefFoundError for
StringSubstitutor
+ TableResult result =
enrichedProducts.executeInsert("enriched_products");
+
+ LOG.info("Product enrichment transformation created");
+
+ return result;
+ }
+
+ /** Application entry point. */
+ public static void main(String[] args) throws Exception {
+ AsyncScalarFunctionExample example = new AsyncScalarFunctionExample();
+
+ try {
+ TableResult result = example.execute();
+
+ LOG.info("Job completed successfully");
+ result.await();
+ } catch (Throwable t) {
+ // Log the exception chain to find the root cause
+ LOG.error("Job failed with exception", t);
+ }
+ }
+
+ /** Mock implementation of an AsyncScalarFunction that simulates async
lookups. */
+ public static class MockAsyncLookupFunction extends AsyncScalarFunction {
+ private static final long serialVersionUID = 1L;
+
+ private final String fieldType;
+ private transient ExecutorService executorService;
+
+ public MockAsyncLookupFunction(String fieldType) {
+ this.fieldType = fieldType;
+ }
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ executorService =
+ Executors.newFixedThreadPool(2, new
ExecutorThreadFactory("mock-lookup"));
+ }
+
+ // The correct implementation for AsyncScalarFunction.eval()
+ // - Must return void
+ // - Must take a CompletableFuture as the first parameter
+ public void eval(CompletableFuture<String> resultFuture, String key) {
+ executorService.submit(
+ () -> {
+ resultFuture.complete("Product " + key);
+ });
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (executorService != null) {
+ executorService.shutdown();
+ if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+ executorService.shutdownNow();
+ }
+ }
+ }
+ }
+}
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/test/java/org/apache/flink/table/test/async/AsyncScalarFunctionTest.java
b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/test/java/org/apache/flink/table/test/async/AsyncScalarFunctionTest.java
new file mode 100644
index 00000000000..b527d11c29d
--- /dev/null
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/test/java/org/apache/flink/table/test/async/AsyncScalarFunctionTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.async;
+
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test that runs the AsyncScalarFunctionExample.
+ *
+ * <p>This test verifies that AsyncScalarFunction works correctly. The test
passes if the
+ * application runs without errors.
+ */
+public class AsyncScalarFunctionTest {
+
+ @Test
+ public void testAsyncScalarFunction() throws Exception {
+ // Create and run the example application
+ AsyncScalarFunctionExample example = new AsyncScalarFunctionExample();
+ example.execute();
+
+ // If we reach here without exceptions, the test passes
+ }
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index c0790e735f9..9f8a6eebe49 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -71,6 +71,8 @@ under the License.
<module>flink-end-to-end-tests-jdbc-driver</module>
<module>flink-end-to-end-tests-restclient</module>
<module>flink-failure-enricher-test</module>
+ <module>flink-end-to-end-tests-table-api</module>
+
</modules>
<dependencies>
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
index bc3b004bc0c..289a3f23fa6 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
@@ -72,6 +72,7 @@ public class PlannerModule {
"org.codehaus.commons",
"org.apache.commons.lang3",
"org.apache.commons.math3",
+ "org.apache.commons.text",
// with hive dialect, hadoop jar should be
in classpath,
// also, we should make it loaded by owner
classloader,
// otherwise, it'll throw class not found
exception