[ https://issues.apache.org/jira/browse/FLINK-19754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317029#comment-17317029 ]
JieFang.He commented on FLINK-19754: ------------------------------------ It happens when use UI or restAPI to submit the job when Job has more then one execute like that {code:java} bsTableEnv.executeSql(insertSql); bsEnv.execute("bsEnv"); {code} but it can run with flink run, are there any restrictions on restAPIĀ > Cannot have more than one execute() or executeAsync() call in a single > environment. > ----------------------------------------------------------------------------------- > > Key: FLINK-19754 > URL: https://issues.apache.org/jira/browse/FLINK-19754 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.11.2 > Reporter: little-tomato > Priority: Major > > I run this code on my Standalone Cluster. When i submit the job,the error log > is as follows: > {code} > 2020-10-20 11:53:42,969 WARN > org.apache.flink.client.deployment.application.DetachedApplicationRunner [] - > Could not execute application: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: Cannot have more than one execute() or executeAsync() call > in a single environment. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100) > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > [?:1.8.0_221] > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [?:1.8.0_221] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_221] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > [?:1.8.0_221] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > [?:1.8.0_221] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_221] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_221] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221] > Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more > than one execute() or executeAsync() call in a single environment. > at > org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697) > ~[flink-dist_2.12-1.11.2.jar:1.11.2] > at cn.cuiot.dmp.ruleengine.job.RuleEngineJob.main(RuleEngineJob.java:556) > ~[?:?] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_221] > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_221] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_221] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_221] > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ~[flink-clients_2.12-1.11.0.jar:1.11.0] > {code} > my code is: > {code:java} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); > ... > FlinkKafkaConsumer<String> myConsumer = new > FlinkKafkaConsumer<String>("kafkatopic", new SimpleStringSchema(), > properties); > myConsumer.setStartFromLatest(); > DataStream<String> kafkaDataStream = env.addSource(myConsumer); > SingleOutputStreamOperator<MessageInfo> sourceStream = kafkaDataStream > .map(new MapFunction<String, MessageInfo>() > { ... } > ); > DataStream<Row> dataStreamRow = sourceStream.map(new > MyMapFunction()).filter(new RuleDataProccessFunction()).map(new > MapFunction<MessageInfo, Row>() > { private static final long serialVersionUID = 1L; @Override public Row > map(MessageInfo value) throws Exception \\{ ... } > }).returns(new RowTypeInfo(rowTypeArr, fieldArr)); > tEnv.registerFunction("test",new TestFunction()); > Table table = tEnv.fromDataStream(dataStreamRow, fieldStr); > tEnv.createTemporaryView("mytable", table); > String ddl = "CREATE TABLE user_log_1155 ...from kafka topic:user_log_1155"; > tEnv.executeSql(ddl); > String ddl1 = "CREATE TABLE user_test_1155 ...from kafka > topic:user_test_1155"; > tEnv.executeSql(ddl); > StatementSet stmtSet = tEnv.createStatementSet(); > stmtSet.addInsertSql("INSERT INTO user_log_1155 SELECT xxx from mytable"); > stmtSet.addInsertSql("INSERT INTO user_test_1155 SELECT xxx from mytable"); > stmtSet.execute(); > env.execute(requestPrm.getString("xxx")); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)