This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bec57c16c71 [FLINK-35117][Table SQL/Planner] Add commons-text to 
allowed classloader list. This closes #26390
bec57c16c71 is described below

commit bec57c16c719e2bb0045757a183e114691eca0e7
Author: Martijn Visser <[email protected]>
AuthorDate: Fri Apr 4 11:12:27 2025 +0200

    [FLINK-35117][Table SQL/Planner] Add commons-text to allowed classloader 
list. This closes #26390
    
    * [FLINK-35117][Table SQL/Planner] Add commons-text to allowed classloader 
list
    
    This dependency is already in the main POM, like commons-lang3 and 
commons-math3
    
    * [FLINK-35117][Table SQL/Planner] Add E2E test
---
 .../flink-end-to-end-tests-table-api/pom.xml       | 107 +++++++++++
 .../test/async/AsyncScalarFunctionExample.java     | 205 +++++++++++++++++++++
 .../table/test/async/AsyncScalarFunctionTest.java  |  39 ++++
 flink-end-to-end-tests/pom.xml                     |   2 +
 .../flink/table/planner/loader/PlannerModule.java  |   1 +
 5 files changed, 354 insertions(+)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/pom.xml 
b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/pom.xml
new file mode 100644
index 00000000000..93765707b18
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/pom.xml
@@ -0,0 +1,107 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>flink-end-to-end-tests</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>2.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>flink-end-to-end-tests-table-api</artifactId>
+    <name>Flink : E2E Tests : Async Function Table API</name>
+
+    <dependencies>
+        <!-- Flink dependencies -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- Flink Table API -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-api-java</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-planner-loader</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-runtime</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- Logging -->
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-api</artifactId>
+                       <version>${log4j.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-core</artifactId>
+                       <version>${log4j.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.logging.log4j</groupId>
+                       <artifactId>log4j-slf4j-impl</artifactId>
+                       <version>${log4j.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.slf4j</groupId>
+                       <artifactId>slf4j-api</artifactId>
+                       <version>${slf4j.version}</version>
+               </dependency>
+
+               <!-- Test Dependencies -->
+               <dependency>
+                       <groupId>org.junit.jupiter</groupId>
+                       <artifactId>junit-jupiter-api</artifactId>
+                       <version>${junit5.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.junit.jupiter</groupId>
+                       <artifactId>junit-jupiter-engine</artifactId>
+                       <version>${junit5.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <!-- Flink Test Utils -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+    </dependencies>
+</project> 
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/async/AsyncScalarFunctionExample.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/async/AsyncScalarFunctionExample.java
new file mode 100644
index 00000000000..312b17e64c7
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/async/AsyncScalarFunctionExample.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.async;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableDescriptor;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.functions.AsyncScalarFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
+
+/** Example application that tests AsyncScalarFunction with Table API. */
+public class AsyncScalarFunctionExample {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncScalarFunctionExample.class);
+
+    /**
+     * Sets up and executes the Flink processing pipeline.
+     *
+     * @return TableResult from the execution
+     */
+    public TableResult execute() throws Exception {
+        LOG.info("Starting AsyncScalarFunctionExample");
+
+        // Set up Table Environment
+        EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
+        TableEnvironment tableEnv = TableEnvironment.create(settings);
+
+        // Register source table
+        createSourceTable(tableEnv);
+
+        // Register sink table
+        createSinkTable(tableEnv);
+
+        // Register the lookup function
+        registerLookupFunction(tableEnv);
+
+        // Create and execute the transformation using Table API
+        TableResult result = processProducts(tableEnv);
+
+        LOG.info("Executing Flink job");
+
+        return result;
+    }
+
+    /** Creates the source table using datagen connector. */
+    protected void createSourceTable(TableEnvironment tableEnv) {
+        final Schema schema =
+                Schema.newBuilder()
+                        .column("product_id", DataTypes.STRING())
+                        .column("price", DataTypes.DECIMAL(10, 2))
+                        .column("quantity", DataTypes.INT())
+                        .column("ts", DataTypes.TIMESTAMP(3))
+                        .watermark("ts", "ts - INTERVAL '5' SECOND")
+                        .build();
+
+        // Create a temporary mock products table
+        tableEnv.createTemporaryTable(
+                "products",
+                TableDescriptor.forConnector("datagen")
+                        .schema(schema)
+                        .option("number-of-rows", "10")
+                        .option("fields.product_id.kind", "sequence")
+                        .option("fields.product_id.start", "1")
+                        .option("fields.product_id.end", "10")
+                        .option("fields.price.min", "10.00")
+                        .option("fields.price.max", "100.00")
+                        .option("fields.quantity.min", "1")
+                        .option("fields.quantity.max", "10")
+                        .build());
+
+        LOG.info("Source table 'products' created");
+    }
+
+    /** Creates the sink table using blackhole connector. */
+    protected void createSinkTable(TableEnvironment tableEnv) {
+        tableEnv.createTemporaryTable(
+                "enriched_products",
+                TableDescriptor.forConnector("blackhole")
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("product_id", 
DataTypes.STRING())
+                                        .column("price", DataTypes.DECIMAL(10, 
2))
+                                        .column("quantity", DataTypes.INT())
+                                        .column("timestamp", 
DataTypes.TIMESTAMP(3))
+                                        .column("name", DataTypes.STRING())
+                                        .build())
+                        .build());
+
+        LOG.info("Sink table 'enriched_products' created");
+    }
+
+    /** Registers the async lookup function. */
+    protected void registerLookupFunction(TableEnvironment tableEnv) {
+        tableEnv.createTemporaryFunction("lookup_name", new 
MockAsyncLookupFunction("name"));
+        LOG.info("Registered async lookup function");
+    }
+
+    /**
+     * Processes products data using Table API with async lookups. - Performs 
async lookups -
+     * Enriches the data with product details - Inserts into the sink table
+     *
+     * @return TableResult from the execution
+     */
+    protected TableResult processProducts(TableEnvironment tableEnv) {
+        Table products = tableEnv.from("products");
+        Table enrichedProducts =
+                products.select(
+                        $("product_id"),
+                        $("price"),
+                        $("quantity"),
+                        $("ts").as("timestamp"),
+                        call("lookup_name", $("product_id")).as("name"));
+
+        // Execute the query - will fail with NoClassDefFoundError for 
StringSubstitutor
+        TableResult result = 
enrichedProducts.executeInsert("enriched_products");
+
+        LOG.info("Product enrichment transformation created");
+
+        return result;
+    }
+
+    /** Application entry point. */
+    public static void main(String[] args) throws Exception {
+        AsyncScalarFunctionExample example = new AsyncScalarFunctionExample();
+
+        try {
+            TableResult result = example.execute();
+
+            LOG.info("Job completed successfully");
+            result.await();
+        } catch (Throwable t) {
+            // Log the exception chain to find the root cause
+            LOG.error("Job failed with exception", t);
+        }
+    }
+
+    /** Mock implementation of an AsyncScalarFunction that simulates async 
lookups. */
+    public static class MockAsyncLookupFunction extends AsyncScalarFunction {
+        private static final long serialVersionUID = 1L;
+
+        private final String fieldType;
+        private transient ExecutorService executorService;
+
+        public MockAsyncLookupFunction(String fieldType) {
+            this.fieldType = fieldType;
+        }
+
+        @Override
+        public void open(FunctionContext context) throws Exception {
+            executorService =
+                    Executors.newFixedThreadPool(2, new 
ExecutorThreadFactory("mock-lookup"));
+        }
+
+        // The correct implementation for AsyncScalarFunction.eval()
+        // - Must return void
+        // - Must take a CompletableFuture as the first parameter
+        public void eval(CompletableFuture<String> resultFuture, String key) {
+            executorService.submit(
+                    () -> {
+                        resultFuture.complete("Product " + key);
+                    });
+        }
+
+        @Override
+        public void close() throws Exception {
+            if (executorService != null) {
+                executorService.shutdown();
+                if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+                    executorService.shutdownNow();
+                }
+            }
+        }
+    }
+}
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/test/java/org/apache/flink/table/test/async/AsyncScalarFunctionTest.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/test/java/org/apache/flink/table/test/async/AsyncScalarFunctionTest.java
new file mode 100644
index 00000000000..b527d11c29d
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/test/java/org/apache/flink/table/test/async/AsyncScalarFunctionTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.async;
+
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test that runs the AsyncScalarFunctionExample.
+ *
+ * <p>This test verifies that AsyncScalarFunction works correctly. The test 
passes if the
+ * application runs without errors.
+ */
+public class AsyncScalarFunctionTest {
+
+    @Test
+    public void testAsyncScalarFunction() throws Exception {
+        // Create and run the example application
+        AsyncScalarFunctionExample example = new AsyncScalarFunctionExample();
+        example.execute();
+
+        // If we reach here without exceptions, the test passes
+    }
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 3acd5b60fc7..6d34946f7db 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -71,6 +71,8 @@ under the License.
                <module>flink-end-to-end-tests-jdbc-driver</module>
                <module>flink-end-to-end-tests-restclient</module>
                <module>flink-failure-enricher-test</module>
+               <module>flink-end-to-end-tests-table-api</module>
+               
     </modules>
 
        <dependencies>
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
index bc3b004bc0c..289a3f23fa6 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
@@ -72,6 +72,7 @@ public class PlannerModule {
                                     "org.codehaus.commons",
                                     "org.apache.commons.lang3",
                                     "org.apache.commons.math3",
+                                    "org.apache.commons.text",
                                     // with hive dialect, hadoop jar should be 
in classpath,
                                     // also, we should make it loaded by owner 
classloader,
                                     // otherwise, it'll throw class not found 
exception

Reply via email to