Re: 在本地环境IDEA远程调试Flink报错

2021-07-09 文章 r pp
先编译正确后,再debug

tangzhi8...@gmail.com  于2021年6月28日周一 下午3:02写道:

> 目的:想在本地环境IDEA远程调试Flink
> 步骤:
> 1.这是Debug的配置项
> 2.报错堆栈信息:
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Failed to execute job 'Streaming WordCount'.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:374)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:120)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:817)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:249)
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1060)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1148)
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1148)
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> 'Streaming WordCount'.
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1984)
> at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
> at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1845)
> at
> org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:97)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:357)
> ... 8 more
> Caused by: java.lang.RuntimeException: Error while waiting for job to be
> initialized
> at
> org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:166)
> at
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:82)
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at
> java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
> complete the operation. Number of retries has been exhausted.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$null$0(AbstractSessionClusterExecutor.java:83)
> at
> org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:146)
> ... 9 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
> Could not complete the operation. Number of retries has been exhausted.
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.rest.util.RestClientException: Response was
> neither of the expected type([simple type, class
> org.apache.flink.runtime.rest.messages.job.JobDetailsInfo]) nor an error.

Re: local运行模式下不会生成checkpoint吗?

2021-07-09 文章 Yun Tang
Hi

只要enable了checkpoint,一定会生成checkpoint的,这与你的运行模式无关。可以检查一下日志,看看JM端是否正常触发了checkpoint

祝好
唐云

From: casel.chen 
Sent: Tuesday, June 29, 2021 9:55
To: user-zh@flink.apache.org 
Subject: local运行模式下不会生成checkpoint吗?

我在本地使用local运行模式运行flink sql,将数据从kafka写到mongodb,mongodb 
connector是自己开发的,实现了CheckpointedFunction接口,debug的时候发现数据进来的时候有调用invoke方法,但没有调用initialState和snapshotState方法,我有设置enableCheckpoint,同样的程序使用kubernetes部署发现是会调用snapshotState方法。我的问题是:local运行模式下不会生成checkpoint吗?


flink on native k8s要如何动态改变日志配置?

2021-07-09 文章 casel.chen
flink运行在原生k8s上,现在想要修改Root Logger Level和动态添加 Logger Name -> Logger 
Level,以及用户可以传入自定义的日志模板,目前有办法做到么?

自定义函数参数不能正确获取参数

2021-07-09 文章 Chenzhiyuan(HR)

我定义了一个kafka来源的table,sql查询时调了自定义函数, 但是发现参数不能被正确传递给自定义函数eval.
我用的flink版本是1.10.0.


l  json 的ddl如下:
private static final String personKafkaTable = "CREATE TABLE 
hw_person_normal_t(\n"
+ " data ARRAY>,\n"
+ " key STRING,\n"
+ " operation STRING\n"
+ ") with (\n"
+ "'connector.type' = 'kafka', \n"
+ "'connector.version' = 'universal',\n"
+ "'connector.topic' = 'HR_SALARY_FLINK_TEST',\n"
+ "'connector.properties.zookeeper.connect' = 'xxx',\n"
+ "'connector.properties.bootstrap.servers' = 'xxx',\n"
+ " 'connector.properties.group.id' = 'salaryGroup',\n"
+ " 'format.type' = 'json'\n"
+ ")";


l  sql查询中调用了自定义函数如下:

Table tempTable = tEnv.sqlQuery("select data from hw_person_normal_t")
.joinLateral("ParserJsonFunc(data) as (personNormalId, uuId, lastOrgId, 
lastDepartmentCode, operationType)")
.select("personNormalId, uuId, lastOrgId, lastDepartmentCode, 
operationType");


l  调试时发现自定义函数 eval 传递过来的value参数有7条,但是每条数据的都是空。


自定义function函数如下:

public class ParserJsonPersonNormalFunc extends TableFunction {

private static final Logger log = 
LoggerFactory.getLogger(ParserJsonPersonNormalFunc.class);

public void eval(Row[] value) {
try {
  log.info("eval start");
  collector.collect(Row.of(value));
} catch (Exception e) {
log.error("parser json failed :", e);
}
}

@Override
public TypeInformation getResultType() {
return Types.ROW(Types.STRING, Types.STRING, Types.STRING, 
Types.STRING, Types.STRING);
}
}







代码里注册了function:

tEnv.sqlUpdate(personKafkaTable);
tEnv.registerFunction("ParserJsonFunc", new ParserJsonPersonNormalFunc());





消息体格式如下:

{

"beforeData": [],

"byteSize": 272,

"columnNumber": 32,

"data": [{

"byteSize": 8,

"columnName": "APPLY_PERSON_ID",

"rawData": 10017,

"type": "LONG"

}, {

"byteSize": 12,

"columnName": "UPDATE_SALARY",

"rawData": "11000.00",

"type": "DOUBLE"

}, {

"byteSize": 11,

"columnName": "UP_AMOUNT",

"rawData": "1000.00",

"type": "DOUBLE"

}, {

"byteSize": 3,

"columnName": "CURRENCY",

"rawData": "CNY",

"type": "STRING"

}, {

"byteSize": 32,

"columnName": "EXCHANGE_RATE",

"rawData": "1.00",

"type": "DOUBLE"

},  {

"byteSize": 11,

"columnName": "DEDUCTED_ACCOUNT",

"rawData": "1000.00",

"type": "DOUBLE"

}, {

"byteSize": 1,

"columnName": "ENTER_AT_PROCESS",

"rawData": "Y",

"type": "STRING"

}],

"dataCount": 0,

"dataMetaData": {

"connector": "mysql",

"pos": 1000368076,

"row": 0,

"ts_ms": 1625565737000,

"snapshot": "false",

"db": "testdb",

"table": "flow_person_t"

},

"key": "APPLY_PERSON_ID",

"memorySize": 1120,

"operation": "insert",

"rowIndex": -1,

"timestamp": "1970-01-01 00:00:00"

}