Re: 在本地环境IDEA远程调试Flink报错
先编译正确后,再debug tangzhi8...@gmail.com 于2021年6月28日周一 下午3:02写道: > 目的:想在本地环境IDEA远程调试Flink > 步骤: > 1.这是Debug的配置项 > 2.报错堆栈信息: > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: Failed to execute job 'Streaming WordCount'. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:374) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223) > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:120) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:817) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:249) > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1060) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1148) > at > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1148) > Caused by: org.apache.flink.util.FlinkException: Failed to execute job > 'Streaming WordCount'. > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1984) > at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135) > at > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1845) > at > org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:97) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:357) > ... 8 more > Caused by: java.lang.RuntimeException: Error while waiting for job to be > initialized > at > org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:166) > at > org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:82) > at > org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not > complete the operation. Number of retries has been exhausted. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$null$0(AbstractSessionClusterExecutor.java:83) > at > org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:146) > ... 9 more > Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: > Could not complete the operation. Number of retries has been exhausted. > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.concurrent.CompletionException: > org.apache.flink.runtime.rest.util.RestClientException: Response was > neither of the expected type([simple type, class > org.apache.flink.runtime.rest.messages.job.JobDetailsInfo]) nor an error.
Re: local运行模式下不会生成checkpoint吗?
Hi 只要enable了checkpoint,一定会生成checkpoint的,这与你的运行模式无关。可以检查一下日志,看看JM端是否正常触发了checkpoint 祝好 唐云 From: casel.chen Sent: Tuesday, June 29, 2021 9:55 To: user-zh@flink.apache.org Subject: local运行模式下不会生成checkpoint吗? 我在本地使用local运行模式运行flink sql,将数据从kafka写到mongodb,mongodb connector是自己开发的,实现了CheckpointedFunction接口,debug的时候发现数据进来的时候有调用invoke方法,但没有调用initialState和snapshotState方法,我有设置enableCheckpoint,同样的程序使用kubernetes部署发现是会调用snapshotState方法。我的问题是:local运行模式下不会生成checkpoint吗?
flink on native k8s要如何动态改变日志配置?
flink运行在原生k8s上,现在想要修改Root Logger Level和动态添加 Logger Name -> Logger Level,以及用户可以传入自定义的日志模板,目前有办法做到么?
自定义函数参数不能正确获取参数
我定义了一个kafka来源的table,sql查询时调了自定义函数, 但是发现参数不能被正确传递给自定义函数eval. 我用的flink版本是1.10.0. l json 的ddl如下: private static final String personKafkaTable = "CREATE TABLE hw_person_normal_t(\n" + " data ARRAY>,\n" + " key STRING,\n" + " operation STRING\n" + ") with (\n" + "'connector.type' = 'kafka', \n" + "'connector.version' = 'universal',\n" + "'connector.topic' = 'HR_SALARY_FLINK_TEST',\n" + "'connector.properties.zookeeper.connect' = 'xxx',\n" + "'connector.properties.bootstrap.servers' = 'xxx',\n" + " 'connector.properties.group.id' = 'salaryGroup',\n" + " 'format.type' = 'json'\n" + ")"; l sql查询中调用了自定义函数如下: Table tempTable = tEnv.sqlQuery("select data from hw_person_normal_t") .joinLateral("ParserJsonFunc(data) as (personNormalId, uuId, lastOrgId, lastDepartmentCode, operationType)") .select("personNormalId, uuId, lastOrgId, lastDepartmentCode, operationType"); l 调试时发现自定义函数 eval 传递过来的value参数有7条,但是每条数据的都是空。 自定义function函数如下: public class ParserJsonPersonNormalFunc extends TableFunction { private static final Logger log = LoggerFactory.getLogger(ParserJsonPersonNormalFunc.class); public void eval(Row[] value) { try { log.info("eval start"); collector.collect(Row.of(value)); } catch (Exception e) { log.error("parser json failed :", e); } } @Override public TypeInformation getResultType() { return Types.ROW(Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING); } } 代码里注册了function: tEnv.sqlUpdate(personKafkaTable); tEnv.registerFunction("ParserJsonFunc", new ParserJsonPersonNormalFunc()); 消息体格式如下: { "beforeData": [], "byteSize": 272, "columnNumber": 32, "data": [{ "byteSize": 8, "columnName": "APPLY_PERSON_ID", "rawData": 10017, "type": "LONG" }, { "byteSize": 12, "columnName": "UPDATE_SALARY", "rawData": "11000.00", "type": "DOUBLE" }, { "byteSize": 11, "columnName": "UP_AMOUNT", "rawData": "1000.00", "type": "DOUBLE" }, { "byteSize": 3, "columnName": "CURRENCY", "rawData": "CNY", "type": "STRING" }, { "byteSize": 32, "columnName": "EXCHANGE_RATE", "rawData": "1.00", "type": "DOUBLE" }, { "byteSize": 11, "columnName": "DEDUCTED_ACCOUNT", "rawData": "1000.00", "type": "DOUBLE" }, { "byteSize": 1, "columnName": "ENTER_AT_PROCESS", "rawData": "Y", "type": "STRING" }], "dataCount": 0, "dataMetaData": { "connector": "mysql", "pos": 1000368076, "row": 0, "ts_ms": 1625565737000, "snapshot": "false", "db": "testdb", "table": "flow_person_t" }, "key": "APPLY_PERSON_ID", "memorySize": 1120, "operation": "insert", "rowIndex": -1, "timestamp": "1970-01-01 00:00:00" }