[ https://issues.apache.org/jira/browse/FLINK-10263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627711#comment-16627711 ]
ASF GitHub Bot commented on FLINK-10263: ---------------------------------------- asfgit closed pull request #6725: [FLINK-10263] [sql-client] Fix classloader issues in SQL Client URL: https://github.com/apache/flink/pull/6725 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh b/flink-end-to-end-tests/test-scripts/test_sql_client.sh index b5830725db1..ca0251365e6 100755 --- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh +++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh @@ -212,6 +212,8 @@ tables: type: VARCHAR - name: duplicate_count type: BIGINT + - name: constant + type: VARCHAR connector: type: filesystem path: $RESULT @@ -226,6 +228,8 @@ tables: type: VARCHAR - name: duplicate_count type: BIGINT + - name: constant + type: VARCHAR functions: - name: RegReplace @@ -261,7 +265,7 @@ $FLINK_DIR/bin/sql-client.sh embedded \ read -r -d '' SQL_STATEMENT_2 << EOF INSERT INTO CsvSinkTable - SELECT * + SELECT AvroBothTable.*, RegReplace('Test constant folding.', 'Test', 'Success') AS constant FROM AvroBothTable EOF @@ -285,4 +289,4 @@ for i in {1..10}; do sleep 5 done -check_result_hash "SQLClient" $RESULT "dca08a82cc09f6b19950291dbbef16bb" +check_result_hash "SQLClient" $RESULT "0a1bf8bf716069b7269f575f87a802c0" diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java index 85b3e9265a8..552d0b37dca 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java @@ -75,6 +75,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Supplier; /** * Context for executing table programs. This class caches everything that can be cached across @@ -183,6 +184,19 @@ public EnvironmentInstance createEnvironmentInstance() { return tableSinks; } + /** + * Executes the given supplier using the execution context's classloader as thread classloader. + */ + public <R> R wrapClassLoader(Supplier<R> supplier) { + final ClassLoader previousClassloader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(classLoader); + try { + return supplier.get(); + } finally { + Thread.currentThread().setContextClassLoader(previousClassloader); + } + } + // -------------------------------------------------------------------------------------------- private static CommandLine createCommandLine(Deployment deployment, Options commandLineOptions) { diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index 3b9e8e99b82..1318043faf1 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -219,14 +219,16 @@ public TableSchema getTableSchema(SessionContext session, String name) throws Sq @Override public String explainStatement(SessionContext session, String statement) throws SqlExecutionException { - final TableEnvironment tableEnv = getOrCreateExecutionContext(session) + final ExecutionContext<?> context = getOrCreateExecutionContext(session); + final TableEnvironment tableEnv = context .createEnvironmentInstance() .getTableEnvironment(); // translate try { final Table table = createTable(tableEnv, statement); - return tableEnv.explain(table); + // explanation requires an optimization step that might reference UDFs during code compilation + return context.wrapClassLoader(() -> tableEnv.explain(table)); } catch (Throwable t) { // catch everything such that the query does not crash the executor throw new SqlExecutionException("Invalid SQL statement.", t); @@ -242,7 +244,7 @@ public ResultDescriptor executeQuery(SessionContext session, String query) throw @Override public TypedResult<List<Tuple2<Boolean, Row>>> retrieveResultChanges(SessionContext session, String resultId) throws SqlExecutionException { - final DynamicResult result = resultStore.getResult(resultId); + final DynamicResult<?> result = resultStore.getResult(resultId); if (result == null) { throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'."); } @@ -254,7 +256,7 @@ public ResultDescriptor executeQuery(SessionContext session, String query) throw @Override public TypedResult<Integer> snapshotResult(SessionContext session, String resultId, int pageSize) throws SqlExecutionException { - final DynamicResult result = resultStore.getResult(resultId); + final DynamicResult<?> result = resultStore.getResult(resultId); if (result == null) { throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'."); } @@ -266,7 +268,7 @@ public ResultDescriptor executeQuery(SessionContext session, String query) throw @Override public List<Row> retrieveResultPage(String resultId, int page) throws SqlExecutionException { - final DynamicResult result = resultStore.getResult(resultId); + final DynamicResult<?> result = resultStore.getResult(resultId); if (result == null) { throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'."); } @@ -350,7 +352,7 @@ public void stop(SessionContext session) { private <C> ProgramTargetDescriptor executeUpdateInternal(ExecutionContext<C> context, String statement) { final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance(); - applyUpdate(envInst.getTableEnvironment(), envInst.getQueryConfig(), statement); + applyUpdate(context, envInst.getTableEnvironment(), envInst.getQueryConfig(), statement); // create job graph with dependencies final String jobName = context.getSessionContext().getName() + ": " + statement; @@ -392,7 +394,11 @@ public void stop(SessionContext session) { final String jobName = context.getSessionContext().getName() + ": " + query; final JobGraph jobGraph; try { - table.writeToSink(result.getTableSink(), envInst.getQueryConfig()); + // writing to a sink requires an optimization step that might reference UDFs during code compilation + context.wrapClassLoader(() -> { + table.writeToSink(result.getTableSink(), envInst.getQueryConfig()); + return null; + }); jobGraph = envInst.createJobGraph(jobName); } catch (Throwable t) { // the result needs to be closed as long as @@ -435,10 +441,14 @@ private Table createTable(TableEnvironment tableEnv, String selectQuery) { /** * Applies the given update statement to the given table environment with query configuration. */ - private void applyUpdate(TableEnvironment tableEnv, QueryConfig queryConfig, String updateStatement) { + private <C> void applyUpdate(ExecutionContext<C> context, TableEnvironment tableEnv, QueryConfig queryConfig, String updateStatement) { // parse and validate statement try { - tableEnv.sqlUpdate(updateStatement, queryConfig); + // update statement requires an optimization step that might reference UDFs during code compilation + context.wrapClassLoader(() -> { + tableEnv.sqlUpdate(updateStatement, queryConfig); + return null; + }); } catch (Throwable t) { // catch everything such that the statement does not crash the executor throw new SqlExecutionException("Invalid SQL update statement.", t); diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala index f58e12cfea2..2b50bb92c9a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala @@ -23,7 +23,6 @@ import java.util import org.apache.calcite.plan.RelOptPlanner import org.apache.calcite.rex.{RexBuilder, RexNode} import org.apache.calcite.sql.`type`.SqlTypeName -import org.apache.commons.lang3.StringEscapeUtils import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.java.typeutils.RowTypeInfo @@ -101,7 +100,10 @@ class ExpressionReducer(config: TableConfig) |""".stripMargin, resultType) - val clazz = compile(getClass.getClassLoader, generatedFunction.name, generatedFunction.code) + val clazz = compile( + Thread.currentThread().getContextClassLoader, + generatedFunction.name, + generatedFunction.code) val function = clazz.newInstance() // execute ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined function with LITERAL paramters yields CompileException > -------------------------------------------------------------------- > > Key: FLINK-10263 > URL: https://issues.apache.org/jira/browse/FLINK-10263 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Affects Versions: 1.7.0 > Reporter: Fabian Hueske > Assignee: Timo Walther > Priority: Major > Labels: pull-request-available > > When using a user-defined scalar function only with literal parameters, a > {{CompileException}} is thrown. For example > {code} > SELECT myFunc(CAST(40.750444 AS FLOAT), CAST(-73.993475 AS FLOAT)) > public class MyFunc extends ScalarFunction { > public int eval(float lon, float lat) { > // do something > } > } > {code} > results in > {code} > [ERROR] Could not execute SQL statement. Reason: > org.codehaus.commons.compiler.CompileException: Line 5, Column 10: Cannot > determine simple type name "com" > {code} > The problem is probably caused by the expression reducer because it > disappears if a regular attribute is added to a parameter expression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)