回复:Flink在HA模式,重启ZK集群,客户端任务提交异常

2024-07-11 文章 wjw_bigd...@163.com
退订



 回复的原邮件 
| 发件人 | love_h1...@126.com |
| 日期 | 2024年07月11日 16:10 |
| 收件人 | user-zh@flink.apache.org |
| 抄送至 | |
| 主题 | Flink在HA模式,重启ZK集群,客户端任务提交异常 |
问题现象:
Flink 1.11.6版本,Standalone HA模式, 滚动重启了ZK集群;在Flink集群的一个节点上使用flink run 命令提交多个任务;
部分任务提交失败,异常信息如下:
[Flink-DispatcherRestEndpoint-thread-2] - [WARN ] - 
[org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(line:290)]
 - Could not create remote rpc invocation message. Failing rpc invocation 
because...
java.io.IOException: The rpc invocation size 12532388 exceeds the maximum akka 
framesize.


日志信息:
集群中A点的JobManager日志有获得主角色的日志信息
17:19:45,433 - [flink-akka.actor.default-dispatcher-22] - [INFO ] - 
[org.apache.flink.runtime.resourcemanager.ResourceManager.tryAcceptLeadership(line:1118)]
 - ResourceManager 
akka.tcp://flink@10.10.160.57:46746/user/rpc/resourcemanager_0 was granted 
leadership with fencing token ad84d46e902e0cf6da92179447af4e00
17:19:45,434 - [main-EventThread] - [INFO ] - 
[org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.grantLeadership(line:931)]
 - http://XXX:XXX was granted leadership with 
leaderSessionID=f60df688-372d-416b-a965-989a59b37feb
17:19:45,437 - [flink-akka.actor.default-dispatcher-22] - [INFO ] - 
[org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.start(line:287)]
 - Starting the SlotManager.
17:19:45,480 - [main-EventThread] - [INFO ] - 
[org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.startInternal(line:97)]
 - Start SessionDispatcherLeaderProcess.XXX
17:19:45,489 - [cluster-io-thread-1] - [INFO ] - 
[org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer(line:232)] - 
Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at 
akka://flink/user/rpc/dispatcher_1 .
17:19:45,495 - [flink-akka.actor.default-dispatcher-23] - [INFO ] - 
[org.apache.flink.runtime.resourcemanager.ResourceManager.registerTaskExecutorInternal(line:891)]
 - Registering TaskManager with ResourceID XX 
(akka.tcp://flink@X:XX/user/rpc/taskmanager_0) at ResourceManager

Flink集群中有两个节点(A和B)接收到了Job提交请求,两个节点的日志中均有如下信息
[flink-akka.actor.default-dispatcher-33] - [INFO ] - 
[org.apache.flink.runtime.jobmaster.JobMaster.connectToResourceManager(line:1107)]
 - Connecting to ResourceManager 
akka.tcp://flink@X.X.X.X:46746/user/rpc/resourcemanager_0(ad84d46e902e0cf6da92179447af4e00)
集群中有4个JobManager节点日志出现了 Start SessionDispatcherLeaderProcess日志,但几乎都跟随了Stopping 
SessionDispatcherLeaderProcess日志,但(A和B)点没有Stopping 
SessionDispatcherLeaderProcess信息
[main-EventThread] - [INFO ] - 
[org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.startInternal(line:97)]
 - Start SessionDispatcherLeaderProcess.
[Curator-ConnectionStateManager-0] - [INFO ] - 
[org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.closeInternal(line:134)]
 - Stopping SessionDispatcherLeaderProcess.






??????????

2024-07-01 文章 wjw_bigd...@163.com




  
| ?? | wjw_bigd...@163.com |
|  | 2024??07??01?? 17:17 |
| ?? | user-zh |
| ?? | |
|  | ?? |




  
| ?? | wjw_bigd...@163.com |
|  | 2024??07??01?? 17:12 |
| ?? | user-zh |
| ?? | |
|  | ?? |




  
| ?? | zhanggongx |
|  | 2024??07??01?? 16:52 |
| ?? | user-zh |
| ?? | |
|  |  |


回复:这绝对算是bug

2024-07-01 文章 wjw_bigd...@163.com
退订



 回复的原邮件 
| 发件人 | Cuixb |
| 日期 | 2024年07月01日 22:16 |
| 收件人 | user-zh@flink.apache.org |
| 抄送至 | |
| 主题 | Re: 这绝对算是bug |
GC不能说长,也绝对不短,大概计算了一下,24G内存,得有10秒左右无响应,多数在10秒内
发自我的 iPhone

> 在 2024年7月1日,17:20,rui chen  写道:
>
> 建议检查一下JM的GC情况。
>
> wjw_bigd...@163.com  于2024年7月1日周一 17:18写道:
>
>> 退订
>>
>>
>>
>>  回复的原邮件 
>> | 发件人 | wjw_bigd...@163.com |
>> | 日期 | 2024年07月01日 17:13 |
>> | 收件人 | user-zh |
>> | 抄送至 | |
>> | 主题 | 回复:这绝对算是bug |
>> 退订
>>
>>
>>
>>  回复的原邮件 
>> | 发件人 | 星海<2278179...@qq.com.INVALID> |
>> | 日期 | 2024年06月29日 21:31 |
>> | 收件人 | user-zh |
>> | 抄送至 | |
>> | 主题 | 回复: 这绝对算是bug |
>> 退订
>>
>>
>> -- 原始邮件 --
>> 发件人:
>>  "user-zh"
>><
>> cfso3...@126.com>;退订
>> 发送时间: 2024年6月29日(星期六) 晚上8:24
>> 收件人: "user-zh">
>> 主题: Re: 这绝对算是bug
>>
>>
>>
>> 连接没问题,主要是tm一直在处理写入流,我也看了一下负载,其实不高,但就是不相应,导致报timeout,然后就是最开始那个错误!
>> 发自我的 iPhone
>>
>> > 在 2024年6月29日,16:49,Zhanghao Chen > >
>> > Hi,从报错看是 JM 丢主了,导致 TM 上 task 全部关停。看下 JM 侧是不是 HA 连接有问题呢?
>> >
>> > Best,
>> > Zhanghao Chen
>> > 
>> > From: Cuixb > > Sent: Saturday, June 29, 2024 10:31
>> > To: user-zh@flink.apache.org > > Subject: 这绝对算是bug
>> >
>> > 生产环境Flink 1.16.2
>> >
>> > 2024-06-29 09:17:23
>> > java.lang.Exception: Job leader for job id
>> 8ccdd299194a686e3ecda602c3c75bf3 lost leadership.
>> >    at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:2310)
>> >    at
>> java.util.Optional.ifPresent(Optional.java:159)
>> >    at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:2308)
>> >    at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
>> >    at
>> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>> >    at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
>> >    at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
>> >    at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
>> >    at akka.japi.pf
>> .UnitCaseStatement.apply(CaseStatements.scala:24)
>> >    at akka.japi.pf
>> .UnitCaseStatement.apply(CaseStatements.scala:20)
>> >    at
>> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>> >    at
>> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>> >    at akka.japi.pf
>> .UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>> >    at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> >    at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> >    at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> >    at
>> akka.actor.Actor.aroundReceive(Actor.scala:537)
>> >    at
>> akka.actor.Actor.aroundReceive$(Actor.scala:535)
>> >    at
>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>> >    at
>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>> >    at
>> akka.actor.ActorCell.invoke(ActorCell.scala:548)
>> >    at
>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>> >    at
>> akka.dispatch.Mailbox.run(Mailbox.scala:231)
>> >    at
>> akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>> >    at
>> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>> >    at
>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(For
>> >
>> > 发自我的 iPhone
>


????????????????bug

2024-07-01 文章 wjw_bigd...@163.com




  
| ?? | wjw_bigd...@163.com |
|  | 2024??07??01?? 17:13 |
| ?? | user-zh |
| ?? | |
|  | bug |




  
| ?? | <2278179...@qq.com.INVALID> |
|  | 2024??06??29?? 21:31 |
| ?? | user-zh |
| ?? | |
|  | ?? ??bug |



--  --
??: 
   "user-zh"



??????????

2024-07-01 文章 wjw_bigd...@163.com




  
| ?? | wjw_bigd...@163.com |
|  | 2024??07??01?? 17:12 |
| ?? | user-zh |
| ?? | |
|  | ?? |




  
| ?? | zhanggongx |
|  | 2024??07??01?? 16:52 |
| ?? | user-zh |
| ?? | |
|  |  |


????????????????bug

2024-07-01 文章 wjw_bigd...@163.com




  
| ?? | <2278179...@qq.com.INVALID> |
|  | 2024??06??29?? 21:31 |
| ?? | user-zh |
| ?? | |
|  | ?? ??bug |



--  --
??: 
   "user-zh"



??????????

2024-07-01 文章 wjw_bigd...@163.com




  
| ?? | zhanggongx |
|  | 2024??07??01?? 16:52 |
| ?? | user-zh |
| ?? | |
|  |  |


??????????

2024-06-26 文章 wjw_bigd...@163.com




  
| ?? | <402987...@qq.com.INVALID> |
|  | 2024??06??26?? 16:38 |
| ?? | user-zh |
| ?? | |
|  | ?? |








 





--  --
??: 
   "user-zh"
    
<15171440...@163.com>;
: 2024??6??26??(??) 4:36
??: "user-zh"

回复:cdc读取oracle数据如何解析

2024-06-25 文章 wjw_bigd...@163.com
退订真的很麻烦,,,退订了好几次没搞懂



 回复的原邮件 
| 发件人 | ha.fen...@aisino.com |
| 日期 | 2024年06月25日 17:25 |
| 收件人 | user-zh |
| 抄送至 | |
| 主题 | Re: 回复:cdc读取oracle数据如何解析 |
数据没问题
"ID" "NAME"   "ADDTIME""PRICE"
1 "aa" 2024-6-25 14:21:33  12.22

发件人: 15868861416
发送时间: 2024-06-25 17:19
收件人: user-zh@flink.apache.org
主题: 回复:cdc读取oracle数据如何解析
检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串




| |
博星
|
|
15868861...@163.com
|


 回复的原邮件 
| 发件人 | ha.fen...@aisino.com |
| 发送日期 | 2024年06月25日 15:54 |
| 收件人 | user-zh |
| 主题 | cdc读取oracle数据如何解析 |
根据文档的代码
JdbcIncrementalSource oracleChangeEventSource =
new OracleSourceBuilder()
.hostname("host")
.port(1521)
.databaseList("ORCLCDB")
.schemaList("DEBEZIUM")
.tableList("DEBEZIUM.PRODUCTS")
.username("username")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true) // output the schema changes as well
.startupOptions(StartupOptions.initial())
.debeziumProperties(debeziumProperties)
.splitSize(2)
.build();
返回的结果:
{"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null}

如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同


Re: use flink 1.19 JDBC Driver can find jdbc connector

2024-05-13 文章 kellygeorg...@163.com
退订



 Replied Message 
| From | abc15...@163.com |
| Date | 05/10/2024 12:26 |
| To | user-zh@flink.apache.org |
| Cc | |
| Subject | Re: use flink 1.19 JDBC Driver can find jdbc connector |
I've solved it. You need to register the number of connections in the jar of 
gateway. But this is inconvenient, and I still hope to improve it.
发自我的 iPhone

> 在 2024年5月10日,11:56,Xuyang  写道:
>
> Hi, can you print the classloader and verify if the jdbc connector exists in 
> it?
>
>
>
>
> --
>
>Best!
>Xuyang
>
>
>
>
>
> At 2024-05-09 17:48:33, "McClone"  wrote:
>> I put flink-connector-jdbc into flink\lib.use flink 1.19 JDBC Driver can not 
>>  find jdbc connector,but use sql-client is normal.


flink集群如何将日志直接写入elasticsearch中?

2024-03-13 文章 kellygeorg...@163.com
有没有比较方便快捷的解决方案?




回复:flink operator 高可用任务偶发性报错unable to update ConfigMapLock

2024-03-11 文章 kellygeorg...@163.com
有没有高手指点一二???在线等



 回复的原邮件 
| 发件人 | kellygeorg...@163.com |
| 日期 | 2024年03月11日 20:29 |
| 收件人 | user-zh |
| 抄送至 | |
| 主题 | flink operator 高可用任务偶发性报错unable to update ConfigMapLock |
jobmanager的报错如下所示,请问是什么原因?
Exception occurred while renewing lock:Unable to update ConfigMapLock

Caused by:io.fabric8.kubernetes.client.Kubernetes Client 
Exception:Operation:[replace] for kind:[ConfigMap] with name:[flink task 
xx- configmap] in namespace:[default]


Caused by: Java.net.SocketTimeoutException:timeout








flink operator 高可用任务偶发性报错unable to update ConfigMapLock

2024-03-11 文章 kellygeorg...@163.com
jobmanager的报错如下所示,请问是什么原因?
Exception occurred while renewing lock:Unable to update ConfigMapLock

Caused by:io.fabric8.kubernetes.client.Kubernetes Client 
Exception:Operation:[replace] for kind:[ConfigMap] with name:[flink task 
xx- configmap] in namespace:[default]


Caused by: Java.net.SocketTimeoutException:timeout








flink写kafka 事务问题

2023-06-15 文章 163
据我了解,kafka支持事务,开启checkpoint及exactly-once后仅当checkpoint执行完毕后才能将数据写入kafka中。测试:flink读取kafka的topic
 a写入topic b,开启checkpoint及exactly-once,flink未执行完新一次的checkpoint,但topic 
b已经可以消费到新数据,这是什么原因?请大家指教!

退订

2022-09-10 文章 xudongjun123...@163.com

退订


xudongjun123...@163.com


退订

2022-09-10 文章 xudongjun123...@163.com

退订


xudongjun123...@163.com


退订

2022-08-24 文章 xuxuewe...@163.com
退订



xuxuewe...@163.com


如何在flink中正确使用外部数据库连接

2022-07-23 文章 lxk7...@163.com

目前的项目中,需要使用外部数据库进行实时的look up。实时的主流数据量一天在百万级别,能用到的外部数据库有Mysql,Clickhouse  
以及Redis缓存。
现在是将数据实时落到Clickhouse,然后Flink实时的去look up 
clickhouse。(虽然知道Clickhouse并发性能不强,但目前能用的就只有这个了,需要存储千万级别的数据)
测试了两种方式:
1.使用JDBC连接池的方式去查询,Druid连接池以及C3P0连接池都用过,但是程序都是运行一段时间就会报OOM(有可能是使用方式不对)。通过dump日志排查的时候发现连接池的方式会将很多信息保留下来,所以最终没有使用这种方式。同时的话,在flink内部使用连接池的时候也没有显示的关闭连接。只在Close方法中调用了关闭。
2.使用DriverManager获取连接查询。这种方式目前测试下来,程序是稳定运行的,也没有报OOM。同时也没有去关闭连接。
问题:1.如何正确在flink内部使用外部数据库连接,使用数据池的方式,个人理解连接的管理都是由数据池来做的,所以不需要去显示close。同时的话,个人认为实时的程序去查,这个连接就会一直占用着,也无需关闭。简言之,无论是数据池还是直连,需不需要在invoke方法中关闭连接?还是只用在close方法中关闭连接。
  2.这种实时的look up除了缓存之外还有没有其他更好的优化手段?或者有什么其他的方案可以替代?


lxk7...@163.com


同步hive报错

2022-07-17 文章 ynz...@163.com
$StoppedState.lambda$start$0(AkkaRpcActor.java:624)
 ~[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 ~[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:623)
 ~[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
... 20 more
2022-07-15 10:33:02,063 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal error 
occurred in the cluster entrypoint.
org.apache.flink.util.FlinkException: JobMaster for job 
7ac231844aff6ea5563e2da39507ef72 failed.
at 
org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:913)
 ~[flink-dist_2.11-1.14.4.jar:1.14.4]
at 
org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:473)
 ~[flink-dist_2.11-1.14.4.jar:1.14.4]
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:430)
 ~[flink-dist_2.11-1.14.4.jar:1.14.4]
at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) 
~[?:1.8.0_301]
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
 ~[?:1.8.0_301]
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
 ~[?:1.8.0_301]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
 ~[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 ~[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)
 ~[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
 ~[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
 ~[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
 ~[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 
[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 
[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at akka.actor.Actor.aroundReceive(Actor.scala:537) 
[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at akka.actor.Actor.aroundReceive$(Actor.scala:535) 
[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) 
[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) 
[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at akka.actor.ActorCell.invoke(ActorCell.scala:548) 
[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) 
[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at akka.dispatch.Mailbox.run(Mailbox.scala:231) 
[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at akka.dispatch.Mailbox.exec(Mailbox.scala:243) 
[flink-rpc-akka_95cbfbf8-1d54-4569-8d93-600a8c51abbe.jar:1.14.4]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
[?:1.8.0_301]
at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) 
[?:1.8.0_301]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) 
[?:1.8.0_301]
at

退订

2022-07-16 文章 wangt...@163.com
退订




Re: Re: flink-hudi-hive

2022-07-12 文章 ynz...@163.com
是的,192.168.10.227:35961是TM地址;
反复初始化是指,在flink web ui的overview界面,Running Job List中对应JOb的status一直是INITIALIZING;
没有TM日志,我暂时还没弄明白为什么退出,flink web ui的TM界面,全程是没有任何信息的;
以下是日志列表,我没找到啥有用信息
directory.info : Total file length is 7201 bytes. 
jobmanager.err : Total file length is 588 bytes. 
jobmanager.log : Total file length is 82894 bytes. 
jobmanager.out : Total file length is 0 bytes. 
launch_container.sh : Total file length is 21758 bytes. 
prelaunch.err : Total file length is 0 bytes. 
prelaunch.out : Total file length is 100 bytes.



best,
ynz...@163.com
 
From: Weihua Hu
Date: 2022-07-12 23:18
To: user-zh
Subject: Re: Re: flink-hudi-hive
单从这个日志看不到一直 Failover ,相关任务反复初始化是指哪个任务呢?
看到了一些 akka 的链接异常,有可能是对应的 TM 异常退出了,可以再确认下 192.168.10.227:35961 这个是不是
TaskManager 地址,以及为什么退出
 
Best,
Weihua
 
 
On Tue, Jul 12, 2022 at 9:37 AM ynz...@163.com  wrote:
 
> 这是job managers所有日志:
> 2022-07-12 09:33:02,280 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: execution.shutdown-on-attached-exit, false
> 2022-07-12 09:33:02,280 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: pipeline.jars,
> file:/home/dataxc/opt/flink-1.14.4/opt/flink-python_2.11-1.14.4.jar
> 2022-07-12 09:33:02,280 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: execution.checkpointing.min-pause, 8min
> 2022-07-12 09:33:02,280 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: restart-strategy, failure-rate
> 2022-07-12 09:33:02,280 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.memory.jvm-metaspace.size, 128m
> 2022-07-12 09:33:02,280 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: state.checkpoints.dir, hdfs:///flink/checkpoints
> 2022-07-12 09:33:02,382 WARN  akka.remote.transport.netty.NettyTransport
>  [] - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused: n103/192.168.10.227:35961
> 2022-07-12 09:33:02,383 WARN  akka.remote.ReliableDeliverySupervisor
>  [] - Association with remote system 
> [akka.tcp://flink@n103:35961]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp://flink@n103:35961]] Caused by:
> [java.net.ConnectException: Connection refused: n103/192.168.10.227:35961]
> 2022-07-12 09:33:02,399 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting
> RPC endpoint for
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager at
> akka://flink/user/rpc/resourcemanager_1 .
> 2022-07-12 09:33:02,405 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Starting the resource manager.
> 2022-07-12 09:33:02,479 INFO
> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider [] -
> Failing over to rm2
> 2022-07-12 09:33:02,509 INFO
> org.apache.flink.yarn.YarnResourceManagerDriver  [] - Recovered
> 0 containers from previous attempts ([]).
> 2022-07-12 09:33:02,509 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Recovered 0 workers from previous attempt.
> 2022-07-12 09:33:02,514 WARN  akka.remote.transport.netty.NettyTransport
>  [] - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused: n103/192.168.10.227:35961
> 2022-07-12 09:33:02,515 WARN  akka.remote.ReliableDeliverySupervisor
>  [] - Association with remote system 
> [akka.tcp://flink@n103:35961]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp://flink@n103:35961]] Caused by:
> [java.net.ConnectException: Connection refused: n103/192.168.10.227:35961]
> 2022-07-12 09:33:02,528 INFO  org.apache.hadoop.conf.Configuration
>  [] - resource-types.xml not found
> 2022-07-12 09:33:02,528 INFO
> org.apache.hadoop.yarn.util.resource.ResourceUtils   [] - Unable to
> find 'resource-types.xml'.
> 2022-07-12 09:33:02,538 INFO
> org.apache.flink.runtime.externalresource.ExternalResourceUtils [] -
> Enabled external resources: []
> 2022-07-12 09:33:02,541 INFO
> org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Upper
> bound of the thread pool size is 500
> 2022-07-12 09:33:02,584 WARN  akka.remote.transport.netty.NettyTransport
>  [] - Remote connection to [null] failed with
> java.net.ConnectException: Connection refused: n103/192.168.10.227:35961
> 2022-07-12 09:33:02,585 WARN  akka.remote.ReliableDeliverySupervisor
> 

Re: Re: flink-hudi-hive

2022-07-11 文章 ynz...@163.com
这是job managers所有日志:
2022-07-12 09:33:02,280 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: execution.shutdown-on-attached-exit, false
2022-07-12 09:33:02,280 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: pipeline.jars, 
file:/home/dataxc/opt/flink-1.14.4/opt/flink-python_2.11-1.14.4.jar
2022-07-12 09:33:02,280 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: execution.checkpointing.min-pause, 8min
2022-07-12 09:33:02,280 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: restart-strategy, failure-rate
2022-07-12 09:33:02,280 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: jobmanager.memory.jvm-metaspace.size, 128m
2022-07-12 09:33:02,280 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: state.checkpoints.dir, hdfs:///flink/checkpoints
2022-07-12 09:33:02,382 WARN  akka.remote.transport.netty.NettyTransport
   [] - Remote connection to [null] failed with 
java.net.ConnectException: Connection refused: n103/192.168.10.227:35961
2022-07-12 09:33:02,383 WARN  akka.remote.ReliableDeliverySupervisor
   [] - Association with remote system [akka.tcp://flink@n103:35961] 
has failed, address is now gated for [50] ms. Reason: [Association failed with 
[akka.tcp://flink@n103:35961]] Caused by: [java.net.ConnectException: 
Connection refused: n103/192.168.10.227:35961]
2022-07-12 09:33:02,399 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager at 
akka://flink/user/rpc/resourcemanager_1 .
2022-07-12 09:33:02,405 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Starting the resource manager.
2022-07-12 09:33:02,479 INFO  
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider [] - Failing 
over to rm2
2022-07-12 09:33:02,509 INFO  org.apache.flink.yarn.YarnResourceManagerDriver   
   [] - Recovered 0 containers from previous attempts ([]).
2022-07-12 09:33:02,509 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Recovered 0 workers from previous attempt.
2022-07-12 09:33:02,514 WARN  akka.remote.transport.netty.NettyTransport
   [] - Remote connection to [null] failed with 
java.net.ConnectException: Connection refused: n103/192.168.10.227:35961
2022-07-12 09:33:02,515 WARN  akka.remote.ReliableDeliverySupervisor
   [] - Association with remote system [akka.tcp://flink@n103:35961] 
has failed, address is now gated for [50] ms. Reason: [Association failed with 
[akka.tcp://flink@n103:35961]] Caused by: [java.net.ConnectException: 
Connection refused: n103/192.168.10.227:35961]
2022-07-12 09:33:02,528 INFO  org.apache.hadoop.conf.Configuration  
   [] - resource-types.xml not found
2022-07-12 09:33:02,528 INFO  
org.apache.hadoop.yarn.util.resource.ResourceUtils   [] - Unable to 
find 'resource-types.xml'.
2022-07-12 09:33:02,538 INFO  
org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Enabled 
external resources: []
2022-07-12 09:33:02,541 INFO  
org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Upper bound 
of the thread pool size is 500
2022-07-12 09:33:02,584 WARN  akka.remote.transport.netty.NettyTransport
   [] - Remote connection to [null] failed with 
java.net.ConnectException: Connection refused: n103/192.168.10.227:35961
2022-07-12 09:33:02,585 WARN  akka.remote.ReliableDeliverySupervisor
   [] - Association with remote system [akka.tcp://flink@n103:35961] 
has failed, address is now gated for [50] ms. Reason: [Association failed with 
[akka.tcp://flink@n103:35961]] Caused by: [java.net.ConnectException: 
Connection refused: n103/192.168.10.227:35961]



best,
ynz...@163.com
 
From: Weihua Hu
Date: 2022-07-11 19:46
To: user-zh
Subject: Re: flink-hudi-hive
Hi,
任务反复初始化是指一直在 Failover 吗?在 JobManager.log 里可以看到作业 Failover 原因,搜索关键字; "to
FAILED"
 
Best,
Weihua
 
 
On Mon, Jul 11, 2022 at 2:46 PM ynz...@163.com  wrote:
 
> Hi,
> 我正在使用flink将数据写入hudi并同步至hive,将任务提交到yarn后,我从flink web
> ui看到:相关任务反复初始化,task managers无任何信息。日志中也无明确错误提示 ;
> 当我删除代码中sync_hive相关配置,并且不改变其他配置,数据能正常写入hudi ;
> 我使用的hudi-0.11.1,flink-1.14.4,hadoop-3.3.1,hive-3.1.3 ;
>
>
>
> best,
> ynz...@163.com
>


flink-hudi-hive

2022-07-10 文章 ynz...@163.com
Hi,
我正在使用flink将数据写入hudi并同步至hive,将任务提交到yarn后,我从flink web ui看到:相关任务反复初始化,task 
managers无任何信息。日志中也无明确错误提示 ;
当我删除代码中sync_hive相关配置,并且不改变其他配置,数据能正常写入hudi ;
我使用的hudi-0.11.1,flink-1.14.4,hadoop-3.3.1,hive-3.1.3 ;



best,
ynz...@163.com


Re: Re: Flink 使用interval join数据丢失疑问

2022-06-11 文章 lxk7...@163.com
非常感谢回复
1.针对watermark,我会再去进行测试。同时还会测试使用处理时间,interval join会不会丢失数据 
2.针对interval jon,我个人的理解是它能关联到的数据范围要比inner 
join大,所以数据应该更准确,但是从结果上看却是数据丢失,当时非常震惊,有点颠覆我的认知了。同时我自己还有一个新的猜测,就是两个流的数据量不一样,可能也会造成数据丢失。目前左流是订单粒度数据,右流是订单-商品粒度数据,数据量要大很多。我个人理解,在处理右流的时候,应该会慢一点,所以导致两边的时间进展可能不一致。但是这又引发了一个新的疑问?inner
 join应该也会受这样的影响
3.还有一个问题可能是我没有阐述清楚,我在sql里使用inner 
join,没有注册水印,那么两个流的join应该是以处理时间来定义的?那么表的state的过期是否也是以处理时间来定义?



lxk7...@163.com
 
发件人: Shengkai Fang
发送时间: 2022-06-11 20:35
收件人: user-zh
主题: Re: Re: Flink 使用interval join数据丢失疑问
hi,
 
对于第一点,丢数据的情况有很多。首先,要确认是不是 JOIN 算子丢数据(SINK 的使用不当也会丢数据)。如果明确了是 join
算子丢的数据,建议明确下丢的数据是咋样的,是不是 watermark 设置不合理,导致数据被误认为是晚到数据从而被丢了。例如,这里的是 `event
time` = `rowtime` - 2s,是不是不合适,我咋记得一般都是 +2 s 呢?
 
对于第二点,interval join 我个人初步的理解是 state 的清理是根据两边的 event time,也就是说,如果右流的 event
time 的更新会影响左流的数据清理。比如说右流的时间点到了 12:00,join 条件要求左流的时间不会晚于右流的时间 1h,那么左流
11:00之前的数据都可以被清理了。
 
对于第三点,我觉得是不能的。目前的 inner join +  state 清理无法覆盖 event time 的window join 的。
 
best,
Shengkai
 
lxk7...@163.com  于2022年6月10日周五 23:03写道:
 
> 对于这个问题,我还是有很大的疑问,再把我这个场景描述一下:
>
> 目前是使用flink进行双流join,两个流的数据,一个流是订单主表,另一个流是订单明细表。我们探查了离线的数据,订单明细表一般会在订单主表生成后晚几秒内生成,这个差异在秒级别。
> 我们做了以下几轮测试,并对比了另一个实时落的表数据量。(这个表就是基准参照数据,只是简单落表,没做任何处理,两边的数据源一致,对比的口径一致。)
> 1.使用datastream api,使用kafka自带的时间戳做水印,使用interval join。对比完结果,数据少。
> 2.使用流转表,sql inner join,没有设置watermark。对比完结果数据正常。
> 3.使用流转表,sql interval join,从数据中的事件时间提取水印,对比完结果数据,数据少。
>  从结果上看,我不太明白为什么sql里inner join能保证数据准确,而interval
> join不行?有什么好的方式或者思路能让我更好的去尝试了解这个问题产生的原因
>
> 针对第二种方式,我的疑问是,sql里没有设置水印,那么表的state过期是以处理时间来计算吗?针对这种设置了表state过期时间的join,我能理解为这个inner
> join其实是一个window join吗?
>
>
>
> lxk7...@163.com
>
> 发件人: lxk
> 发送时间: 2022-06-10 18:18
> 收件人: user-zh
> 主题: Re:Re:Re:Re:Re:Re:Re: Flink 使用interval join数据丢失疑问
>
>
>
> 现在改成了sql interval join,代码和执行计划如下,其他配置没变,数据量还是少,使用inner join就没问题
>
>
>
>
> Table headerTable =
> streamTableEnvironment.fromDataStream(headerFilterStream,
>  Schema.newBuilder()
> .columnByExpression("rowtime",
> "CAST(substring(last_updated_at,0,19) AS TIMESTAMP_LTZ(3))")
> .watermark("rowtime", "rowtime - INTERVAL '2' SECOND")
> .build());
> Table itemTable =
> streamTableEnvironment.fromDataStream(filterItemStream, Schema.newBuilder()
> .columnByExpression("rowtime",
> "CAST(substring(last_updated_at,0,19) AS TIMESTAMP_LTZ(3))")
> .watermark("rowtime", "rowtime - INTERVAL '2' SECOND")
> .build());
>
>
>
>
> streamTableEnvironment.createTemporaryView("header",headerTable);
> streamTableEnvironment.createTemporaryView("item",itemTable);
>
>
>
>
>
>
> Table result = streamTableEnvironment.sqlQuery("select
> header.customer_id" +
> ",item.goods_id" +
> ",header.id" +
> ",header.order_status" +
> ",header.shop_id" +
> ",header.parent_order_id" +
> ",header.order_at" +
> ",header.pay_at" +
> ",header.channel_id" +
> ",header.root_order_id" +
> ",item.id" +
> ",item.row_num" +
> ",item.p_sp_sub_amt" +
> ",item.display_qty" +
> ",item.qty" +
> ",item.bom_type" +
> " from header JOIN item on header.id = item.order_id and
> item.rowtime BETWEEN header.rowtime - INTERVAL '10' SECOND AND
> header.rowtime + INTERVAL '20' SECOND");
>
>
>
>
> String intervalJoin = streamTableEnvironment.explainSql("select
> header.customer_id" +
> ",item.goods_id" +
> ",header.id" +
> ",header.order_status" +
> ",header.shop_id" +
> ",header.parent_order_id" +
> ",header.order_at" +
> ",header.pay_at" +
> ",header.channel_id" +
> ",header.root_order_id" +
> ",item.id" +
> ",item.row_num" +
> ",item.p_sp_sub_amt" +
> ",item.display_qty" +
> ",item.qty" +
> ",item.bom_type" +
> " from header JOIN item on header.id = item.order_id and
> item.rowtime BE

回复: Re: Flink 使用interval join数据丢失疑问

2022-06-10 文章 lxk7...@163.com
对于这个问题,我还是有很大的疑问,再把我这个场景描述一下:
目前是使用flink进行双流join,两个流的数据,一个流是订单主表,另一个流是订单明细表。我们探查了离线的数据,订单明细表一般会在订单主表生成后晚几秒内生成,这个差异在秒级别。
我们做了以下几轮测试,并对比了另一个实时落的表数据量。(这个表就是基准参照数据,只是简单落表,没做任何处理,两边的数据源一致,对比的口径一致。)
1.使用datastream api,使用kafka自带的时间戳做水印,使用interval join。对比完结果,数据少。
2.使用流转表,sql inner join,没有设置watermark。对比完结果数据正常。
3.使用流转表,sql interval join,从数据中的事件时间提取水印,对比完结果数据,数据少。
 从结果上看,我不太明白为什么sql里inner join能保证数据准确,而interval 
join不行?有什么好的方式或者思路能让我更好的去尝试了解这个问题产生的原因
  
针对第二种方式,我的疑问是,sql里没有设置水印,那么表的state过期是以处理时间来计算吗?针对这种设置了表state过期时间的join,我能理解为这个inner
 join其实是一个window join吗?



lxk7...@163.com
 
发件人: lxk
发送时间: 2022-06-10 18:18
收件人: user-zh
主题: Re:Re:Re:Re:Re:Re:Re: Flink 使用interval join数据丢失疑问
 
 
 
现在改成了sql interval join,代码和执行计划如下,其他配置没变,数据量还是少,使用inner join就没问题
 
 
 
 
Table headerTable = 
streamTableEnvironment.fromDataStream(headerFilterStream,   Schema.newBuilder()
.columnByExpression("rowtime", 
"CAST(substring(last_updated_at,0,19) AS TIMESTAMP_LTZ(3))")
.watermark("rowtime", "rowtime - INTERVAL '2' SECOND")
.build());
Table itemTable = 
streamTableEnvironment.fromDataStream(filterItemStream, Schema.newBuilder()
.columnByExpression("rowtime", 
"CAST(substring(last_updated_at,0,19) AS TIMESTAMP_LTZ(3))")
.watermark("rowtime", "rowtime - INTERVAL '2' SECOND")
.build());
 
 
 
 
streamTableEnvironment.createTemporaryView("header",headerTable);
streamTableEnvironment.createTemporaryView("item",itemTable);
 
 
 
 
 
 
Table result = streamTableEnvironment.sqlQuery("select 
header.customer_id" +
",item.goods_id" +
",header.id" +
",header.order_status" +
",header.shop_id" +
",header.parent_order_id" +
",header.order_at" +
",header.pay_at" +
",header.channel_id" +
",header.root_order_id" +
",item.id" +
",item.row_num" +
",item.p_sp_sub_amt" +
",item.display_qty" +
",item.qty" +
",item.bom_type" +
" from header JOIN item on header.id = item.order_id and 
item.rowtime BETWEEN header.rowtime - INTERVAL '10' SECOND AND header.rowtime + 
INTERVAL '20' SECOND");
 
 
 
 
String intervalJoin = streamTableEnvironment.explainSql("select 
header.customer_id" +
",item.goods_id" +
",header.id" +
",header.order_status" +
",header.shop_id" +
",header.parent_order_id" +
",header.order_at" +
",header.pay_at" +
",header.channel_id" +
",header.root_order_id" +
",item.id" +
",item.row_num" +
",item.p_sp_sub_amt" +
",item.display_qty" +
",item.qty" +
",item.bom_type" +
" from header JOIN item on header.id = item.order_id and 
item.rowtime BETWEEN header.rowtime - INTERVAL '10' SECOND AND header.rowtime + 
INTERVAL '20' SECOND");
 
 
System.out.println(intervalJoin);
 
 
DataStream rowDataStream = 
streamTableEnvironment.toChangelogStream(result);
 
 
 
 
 
 
执行计划:
== Abstract Syntax Tree ==
LogicalProject(customer_id=[$2], goods_id=[$16], id=[$0], order_status=[$1], 
shop_id=[$3], parent_order_id=[$4], order_at=[$5], pay_at=[$6], 
channel_id=[$7], root_order_id=[$8], id0=[$13], row_num=[$15], 
p_sp_sub_amt=[$20], display_qty=[$23], qty=[$18], bom_type=[$21])
+- LogicalJoin(condition=[AND(=($0, $14), >=($25, -($12, 1:INTERVAL 
SECOND)), <=($25, +($12, 2:INTERVAL SECOND)))], joinType=[inner])
   :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($12, 
2000:INTERVAL SECOND)])
   :  +- LogicalProject(id=[$0], order_status=[$1], customer_id=[$2], 
shop_id=[$3], parent_order_id=[$4], order_at=[$5], pay_at=[$6], 
channel_id=[$7], root_order_id=[$8], last_updated_at=[$9], business_flag=[$10], 
mysql_op_type=[$11], rowtime=[CAST(SUBSTRING($9, 0, 
19)):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
   : +- LogicalTableScan(table=[[default_catalog, default_database, 
Unregistered_DataStream_Source_5]])
   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($12, 
2000:INTERVAL SECOND)])
  +- LogicalProject(id=[$0], order_id=[$1], row_num=[$2], goods_id=[$3], 
s_sku_code=[$4], qty=[$5],

flink运行一段时间后TaskManager退出,报OutOfMemoryError: Metaspace

2022-06-07 文章 weishishuo...@163.com
我使用的版本是:
flink:1.13.2
flink cdc: flink-connector-jdbc_2.11-1.13.2.jar 
flink-sql-connector-mysql-cdc-2.2.0.jar 
flink-sql-connector-postgres-cdc-2.2.0.jar

任务比较简单,就是从mysql、pg同步数据到pg,mysql,使用的是sql接口,请问大伙儿有碰到过这个问题吗?

2022-06-07 18:13:59,393 ERROR 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - Fatal error 
occurred while executing the TaskManager. Shutting it down...
java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error has 
occurred. This can mean two things: either the job requires a larger size of 
JVM metaspace to load classes or there is a class loading leak. In the first 
case 'taskmanager.memory.jvm-metaspace.size' configuration option should be 
increased. If the error persists (usually in cluster after several job 
(re-)submissions) then there is probably a class loading leak in user code or 
some of its dependencies which has to be investigated and fixed. The task 
executor has to be shutdown...at java.lang.ClassLoader.defineClass1(Native 
Method) ~[?:1.8.0_112]
at java.lang.ClassLoader.defineClass(ClassLoader.java:763) ~[?:1.8.0_112]
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 
~[?:1.8.0_112]
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) 
~[?:1.8.0_112]
at java.net.URLClassLoader.access$100(URLClassLoader.java:73) ~[?:1.8.0_112]
at java.net.URLClassLoader$1.run(URLClassLoader.java:368) ~[?:1.8.0_112]
at java.net.URLClassLoader$1.run(URLClassLoader.java:362) ~[?:1.8.0_112]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_112]
at java.net.URLClassLoader.findClass(URLClassLoader.java:361) ~[?:1.8.0_112]
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:71)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
 [flink-dist_2.11-1.13.2.jar:1.13.2]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) [?:1.8.0_112]
at io.debezium.relational.Column.editor(Column.java:31) 
[blob_p-343be43b7874de49f6ce1d8bcb6a90a384203530-2e5afb6f8bf4834164c1bb92aaf97a00:2.2.0]
at 
io.debezium.connector.postgresql.connection.PostgresConnection.readTableColumn(PostgresConnection.java:464)
 
[blob_p-343be43b7874de49f6ce1d8bcb6a90a384203530-2e5afb6f8bf4834164c1bb92aaf97a00:2.2.0]
at 
io.debezium.jdbc.JdbcConnection.getColumnsDetails(JdbcConnection.java:1226) 
[blob_p-343be43b7874de49f6ce1d8bcb6a90a384203530-2e5afb6f8bf4834164c1bb92aaf97a00:2.2.0]
at io.debezium.jdbc.JdbcConnection.readSchema(JdbcConnection.java:1182) 
[blob_p-343be43b7874de49f6ce1d8bcb6a90a384203530-2e5afb6f8bf4834164c1bb92aaf97a00:2.2.0]
at 
io.debezium.connector.postgresql.PostgresSchema.refresh(PostgresSchema.java:100)
 
[blob_p-343be43b7874de49f6ce1d8bcb6a90a384203530-2e5afb6f8bf4834164c1bb92aaf97a00:2.2.0]
at 
io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource.connectionCreated(PostgresSnapshotChangeEventSource.java:95)
 
[blob_p-343be43b7874de49f6ce1d8bcb6a90a384203530-2e5afb6f8bf4834164c1bb92aaf97a00:2.2.0]
at 
io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:103)
 
[blob_p-343be43b7874de49f6ce1d8bcb6a90a384203530-2e5afb6f8bf4834164c1bb92aaf97a00:2.2.0]




weishishuo...@163.com


Re: Re: Flink写入CK数据丢失问题

2022-06-02 文章 lxk7...@163.com

我们目前程序整体都是正常的,没有发生过报错,checkpoint是有开启的。
今天查阅了一下相关资料,发现flink已有的issue跟我这个有点像[FLINK-23721] Flink SQL state TTL has no 
effect when using non-incremental RocksDBStateBackend - ASF JIRA 
(apache.org),主要原因是我在sql里用了group by,设置了ttl,但是ttl在rocksdb状态后端不生效,所以导致管理内存使用率占满。
目前我的解决方案是使用fsstatebackend,现在观察下来管理内存没有任何问题,我会继续关注整体的数据量差异。


lxk7...@163.com
 
发件人: yue ma
发送时间: 2022-06-02 15:05
收件人: user-zh
主题: Re: Flink写入CK数据丢失问题
你好,你可以先看看你们的任务是否开启了 checkpoint  ,以及任务运行的过程中是否发生了 failover
 
lxk  于2022年6月2日周四 11:38写道:
 
> 各位,请教个问题
> 目前使用flink往ck写入数据,使用的是datastream
> api以及rocksdb状态后端,程序中了开了两个窗口,都是10秒级别。同时还使用了sql进行group by
> 求和,求和的操作没有加窗口,同时streamtableenv 设置了状态生存时间为10s.
> 在跟离线端对比数据的时候发现,一段时间内的数据跟离线差异不大,从0点-17点(数据的事件时间),但是18点(事件时间)以后的数据实时端差异量特别大。
> 目前在webui上发现整个管理内存的使用率已经占满,不知道是否跟这个有关系。
>
> 还有一点现象是,今天的数据我们对比了ck上实时的表(正确的),总体数据量还是要小很多。但是当我从零点重新消费,目前来看今天的数据能够对上,不知道是否是因为程序运行一段时间后,整个管理内存都被占满了,从而导致原本缓存的数据丢失了。
> 以下是相应的算子链和整个tm内存情况。出现反压是因为从今天0点重新开始消费了。
>
>
>
>


关于flinksql聚合函数实现的学习疑问

2022-06-01 文章 hdxg1101300...@163.com
您好:
   最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4)
   比如这样一条sql语句:
 select 
dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 计算 uv 数
count(distinct user_id) as uv,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) 
* 1000  as window_start
from source_table
group by
dim,
tumble(row_time, interval '1' minute);
在指定窗口大小和维度上做聚合计算,可以灵活指定count(*),sum(price), max(price)等聚合函数;
如果使用datastream api来做这种聚合,怎么实现多个聚合计算;目前api的aggregate函数 
aggregate(AggregateFunction aggFunction, WindowFunction 
windowFunction) 
是传入一个聚合函数和一个窗口函数,这种怎么实现灵活的组合;或者flinksql是怎么样实现的,想了解这一块的代码?但是不知道从哪里入手。希望有朋友可以指定下。
谢谢!


hdxg1101300...@163.com


回复: Flink异步IO使用问题

2022-05-26 文章 lxk7...@163.com
重发下图

https://sm.ms/image/12XQHAOZdYoqraC
https://sm.ms/image/zJ2gfmxvSc85Xl7



lxk7...@163.com
 
发件人: lxk7...@163.com
发送时间: 2022-05-26 20:54
收件人: user-zh
主题: Flink异步IO使用问题

我在程序里使用了异步IO,但是好像识别不了这个list类型的数据

lxk7...@163.com


Flink异步IO使用问题

2022-05-26 文章 lxk7...@163.com

我在程序里使用了异步IO,但是好像识别不了这个list类型的数据

lxk7...@163.com


Re: Re: [Internet]Re: Re: Some question with Flink state

2022-05-24 文章 lxk7...@163.com

刚看了下keygroup的原理,前面的内容大致能理解了,对于下面这段话
"map-state的话相当于某些固定的key group里面的key都可以通过map-state的user-key去分别存储"
我理解   
是因为如果使用value-state,一个task会存在多个key,不同的key的内容会进行替换,而使用map的话,就算同一个task有多个key,根据用户自定义的key还是可以匹配到的。
这样的话,大部分场景其实都适合使用map-state。


lxk7...@163.com
 
From: jurluo(罗凯)
Date: 2022-05-25 11:05
To: user-zh@flink.apache.org
Subject: Re: [Internet]Re: Re: Some question with Flink state
老哥,看起来好像没什么问题,相同的key都分配在了同个task,每个task会存在多种key是正常的。key会按最大并行度分成多个key 
group,然后固定的key 
group分配到各个task里。只能保证相同的key会到同一个task,不能保证一个task只有一个key。你这个需求用map-state才合适。map-state的话相当于某些固定的key
 group里面的key都可以通过map-state的user-key去分别存储。
 
> 2022年5月25日 上午10:45,lxk7...@163.com 写道:
> 
> 图片好像又挂了  我重发下
> hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准
> 
> 
> 
>下面是我的代码及测试结果
> 
> 
> 
> 一.使用int类型
> 
> 
> 
>public class KeyByTest {
> 
> 
> 
> public static void main(String[] args) throws Exception {
> 
> 
> 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
> 
> 
> env.setParallelism(10);
> 
> 
> 
> 
> 
> DataStreamSource dataDataStreamSource = 
> env.fromCollection(Arrays.asList(new data(1, "123", "首页"),
> 
> 
> 
> new data(1, "123", "分类页"),
> 
> 
> 
> new data(2, "r-123", "搜索结果页"),
> 
> 
> 
> new data(1, "r-123", "我的页"),
> 
> 
> 
> new data(3, "r-4567", "搜索结果页")));
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> SingleOutputStreamOperator map = 
> dataDataStreamSource.keyBy(new MyKeySelector())
> 
> 
> 
> .map(new RichMapFunction() {
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String map(data data) throws Exception {
> 
> 
> 
> System.out.println(data.toString() + "的subtask为:" + 
> getRuntimeContext().getIndexOfThisSubtask() );
> 
> 
> 
> return data.toString();
> 
> 
> 
> }
> 
> 
> 
> });
> 
> 
> 
> 
> 
> 
> 
> env.execute("test");
> 
> 
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> class data{
> 
> 
> 
> private int id;
> 
> 
> 
> private String goods;
> 
> 
> 
> private String pageName;
> 
> 
> 
> 
> 
> public data(int id, String goods, String pageName) {
> 
> 
> 
> this.id = id;
> 
> 
> 
> this.goods = goods;
> 
> 
> 
> this.pageName = pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> 
> 
> public data() {
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public int getId() {
> 
> 
> 
> return id;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setId(int id) {
> 
> 
> 
> this.id = id;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public String getGoods() {
> 
> 
> 
> return goods;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setGoods(String goods) {
> 
> 
> 
> this.goods = goods;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public String getPageName() {
> 
> 
> 
> return pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setPageName(String pageName) {
> 
> 
> 
> this.pageName = pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String toString() {
> 
> 
> 
> return "data{" +
> 
> 
> 
> "id='" + id + '\'' +
> 
> 
> 
> ", goods='" + goods + '\'' +
> 
> 
> 
> ", pageName='" + pageName + '\'' +
> 
> 
> 
> '}';
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> 
> 
> class MyKeySelector implements KeySelector{
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public Integer getKey(data data) throws Exception {
> 
> 
> 
> return data.getId();
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> 控制台的输出如下:
> https://s2.loli.net/2022/05/25/mxtZu9YAPN2FD1a.png
> 
> 
&g

Re: Re: Some question with Flink state

2022-05-24 文章 lxk7...@163.com
图片好像又挂了  我重发下
hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准



   下面是我的代码及测试结果



    一.使用int类型



   public class KeyByTest {



    public static void main(String[] args) throws Exception {



    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();



    env.setParallelism(10);





    DataStreamSource dataDataStreamSource = 
env.fromCollection(Arrays.asList(new data(1, "123", "首页"),



    new data(1, "123", "分类页"),



    new data(2, "r-123", "搜索结果页"),



    new data(1, "r-123", "我的页"),



    new data(3, "r-4567", "搜索结果页")));











    SingleOutputStreamOperator map = dataDataStreamSource.keyBy(new 
MyKeySelector())



    .map(new RichMapFunction() {





    @Override



    public String map(data data) throws Exception {



    System.out.println(data.toString() + "的subtask为:" + 
getRuntimeContext().getIndexOfThisSubtask() );



    return data.toString();



    }



    });







    env.execute("test");





    }



}



class data{



    private int id;



    private String goods;



    private String pageName;





    public data(int id, String goods, String pageName) {



    this.id = id;



    this.goods = goods;



    this.pageName = pageName;



    }







    public data() {



    }





    public int getId() {



    return id;



    }





    public void setId(int id) {



    this.id = id;



    }





    public String getGoods() {



    return goods;



    }





    public void setGoods(String goods) {



    this.goods = goods;



    }





    public String getPageName() {



    return pageName;



    }





    public void setPageName(String pageName) {



    this.pageName = pageName;



    }





    @Override



    public String toString() {



    return "data{" +



    "id='" + id + '\'' +



    ", goods='" + goods + '\'' +



    ", pageName='" + pageName + '\'' +



    '}';



    }



}





class MyKeySelector implements KeySelector{





    @Override



    public Integer getKey(data data) throws Exception {



    return data.getId();



    }



}



控制台的输出如下:
https://s2.loli.net/2022/05/25/mxtZu9YAPN2FD1a.png



可以看见数据根据id分组,分到了不同的subtask上。







二.使用String类型  代码如下:



public class KeyByTest {



    public static void main(String[] args) throws Exception {



    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();



    env.setParallelism(10);





    DataStreamSource dataDataStreamSource = 
env.fromCollection(Arrays.asList(new data("1", "123", "首页"),



    new data("1", "123", "分类页"),



    new data("2", "r-123", "搜索结果页"),



    new data("2", "r-123", "我的页"),



    new data("3", "r-4567", "搜索结果页")));











    SingleOutputStreamOperator map = dataDataStreamSource.keyBy(new 
MyKeySelector())



    .map(new RichMapFunction() {





    @Override



    public String map(data data) throws Exception {



    System.out.println(data.toString() + "的subtask为:" + 
getRuntimeContext().getIndexOfThisSubtask() );



    return data.toString();



    }



    });







    env.execute("test");





    }



}



class data{



    private String id;



    private String goods;



    private String pageName;





    public data(String id, String goods, String pageName) {



    this.id = id;



    this.goods = goods;



    this.pageName = pageName;



    }







    public data() {



    }





    public String getId() {



    return id;



    }





    public void setId(String id) {



    this.id = id;



    }





    public String getGoods() {



    return goods;



    }





    public void setGoods(String goods) {



    this.goods = goods;



    }





    public String getPageName() {



    return pageName;



    }





    public void setPageName(String pageName) {



    this.pageName = pageName;



    }





    @Override



    public String toString() {



    return "data{" +



    "id='" + id + '\'' +



    ", goods='

Re: Re: Some question with Flink state

2022-05-24 文章 lxk7...@163.com
hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准
   下面是我的代码及测试结果
一.使用int类型
   public class KeyByTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);

DataStreamSource dataDataStreamSource = 
env.fromCollection(Arrays.asList(new data(1, "123", "首页"),
new data(1, "123", "分类页"),
new data(2, "r-123", "搜索结果页"),
new data(1, "r-123", "我的页"),
new data(3, "r-4567", "搜索结果页")));




SingleOutputStreamOperator map = dataDataStreamSource.keyBy(new 
MyKeySelector())
.map(new RichMapFunction() {

@Override
public String map(data data) throws Exception {
System.out.println(data.toString() + "的subtask为:" + 
getRuntimeContext().getIndexOfThisSubtask() );
return data.toString();
}
});


env.execute("test");

}
}
class data{
private int id;
private String goods;
private String pageName;

public data(int id, String goods, String pageName) {
this.id = id;
this.goods = goods;
this.pageName = pageName;
}


public data() {
}

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public String getGoods() {
return goods;
}

public void setGoods(String goods) {
this.goods = goods;
}

public String getPageName() {
return pageName;
}

public void setPageName(String pageName) {
this.pageName = pageName;
}

@Override
public String toString() {
return "data{" +
"id='" + id + '\'' +
", goods='" + goods + '\'' +
", pageName='" + pageName + '\'' +
'}';
}
}

class MyKeySelector implements KeySelector{

@Override
public Integer getKey(data data) throws Exception {
return data.getId();
}
}
控制台的输出如下:
可以看见数据根据id分组,分到了不同的subtask上。


二.使用String类型  代码如下:
public class KeyByTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);

DataStreamSource dataDataStreamSource = 
env.fromCollection(Arrays.asList(new data("1", "123", "首页"),
new data("1", "123", "分类页"),
new data("2", "r-123", "搜索结果页"),
new data("2", "r-123", "我的页"),
new data("3", "r-4567", "搜索结果页")));




SingleOutputStreamOperator map = dataDataStreamSource.keyBy(new 
MyKeySelector())
.map(new RichMapFunction() {

@Override
public String map(data data) throws Exception {
System.out.println(data.toString() + "的subtask为:" + 
getRuntimeContext().getIndexOfThisSubtask() );
return data.toString();
}
});


env.execute("test");

}
}
class data{
private String id;
private String goods;
private String pageName;

public data(String id, String goods, String pageName) {
this.id = id;
this.goods = goods;
this.pageName = pageName;
}


public data() {
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getGoods() {
return goods;
}

public void setGoods(String goods) {
this.goods = goods;
}

public String getPageName() {
return pageName;
}

public void setPageName(String pageName) {
this.pageName = pageName;
}

@Override
public String toString() {
return "data{" +
"id='" + id + '\'' +
", goods='" + goods + '\'' +
", pageName='" + pageName + '\'' +
'}';
}
}

class MyKeySelector implements KeySelector{

@Override
public String getKey(data data) throws Exception {
return data.getId();
}
}
最终控制台输出如下:


可以看见只分了两个组,我不清楚这是否是一个bug.


lxk7...@163.com
 
From: Xuyang
Date: 2022-05-24 21:35
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
我不确定

Re: Re: Some question with Flink state

2022-05-24 文章 lxk7...@163.com
如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗?



lxk7...@163.com
 
From: Xuyang
Date: 2022-05-24 20:51
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
在 2022-05-24 20:43:19,"lxk7...@163.com"  写道:
>
>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>
>这样呢
>
>
>lxk7...@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:17
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>Hi, 你的图还是挂了,可以使用图床工具试一下
> 
> 
> 
>在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
> 
>图片好像有点问题,重新上传一下
>lxk7...@163.com
>From: Hangxiang Yu
>Date: 2022-05-24 12:09
>To: user-zh
>Subject: Re: Re: Some question with Flink state
>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>>
>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>>
>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>>
>>
>>
>> lxk7...@163.com
>>
>> From: Hangxiang Yu
>> Date: 2022-05-23 23:09
>> To: user-zh; lxk7491
>> Subject: Re: Some question with Flink state
>> Hello,
>> All states will not be shared in different parallelisms.
>> BTW, English questions could be sent to u...@flink.apache.org.
>>
>> Best,
>> Hangxiang.
>>
>> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>>
>> >
>> > Hi everyone
>> >I was used Flink keyed-state in my Project.But I found some questions
>> > that make me confused.
>> >when I used value-state in multi parallelism  the value is not I
>> wanted.
>> >So I guess that value-state is in every parallelism. every parallelism
>> > saved their only value  which means the value is Thread-Level
>> >But when I used map-state,the value is correctly. I mean the map-state
>> > was shared by every parallelism.
>> >   looking forward to your reply
>> >
>> >
>> > lxk7...@163.com
>> >
>>


Re: Re: Some question with Flink state

2022-05-24 文章 lxk7...@163.com
好的,我会尝试去弄一下。


lxk7...@163.com
 
From: Xuyang
Date: 2022-05-24 20:51
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
在 2022-05-24 20:43:19,"lxk7...@163.com"  写道:
>
>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>
>这样呢
>
>
>lxk7...@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:17
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>Hi, 你的图还是挂了,可以使用图床工具试一下
> 
> 
> 
>在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
> 
>图片好像有点问题,重新上传一下
>lxk7...@163.com
>From: Hangxiang Yu
>Date: 2022-05-24 12:09
>To: user-zh
>Subject: Re: Re: Some question with Flink state
>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>>
>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>>
>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>>
>>
>>
>> lxk7...@163.com
>>
>> From: Hangxiang Yu
>> Date: 2022-05-23 23:09
>> To: user-zh; lxk7491
>> Subject: Re: Some question with Flink state
>> Hello,
>> All states will not be shared in different parallelisms.
>> BTW, English questions could be sent to u...@flink.apache.org.
>>
>> Best,
>> Hangxiang.
>>
>> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>>
>> >
>> > Hi everyone
>> >I was used Flink keyed-state in my Project.But I found some questions
>> > that make me confused.
>> >when I used value-state in multi parallelism  the value is not I
>> wanted.
>> >So I guess that value-state is in every parallelism. every parallelism
>> > saved their only value  which means the value is Thread-Level
>> >But when I used map-state,the value is correctly. I mean the map-state
>> > was shared by every parallelism.
>> >   looking forward to your reply
>> >
>> >
>> > lxk7...@163.com
>> >
>>


Re: Re: Some question with Flink state

2022-05-24 文章 lxk7...@163.com

https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png

这样呢


lxk7...@163.com
 
From: Xuyang
Date: 2022-05-24 20:17
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
Hi, 你的图还是挂了,可以使用图床工具试一下
 
 
 
在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
 
图片好像有点问题,重新上传一下
lxk7...@163.com
From: Hangxiang Yu
Date: 2022-05-24 12:09
To: user-zh
Subject: Re: Re: Some question with Flink state
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7...@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to u...@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>
> >
> > Hi everyone
> >I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >when I used value-state in multi parallelism  the value is not I
> wanted.
> >So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7...@163.com
> >
>


Re: Re: Some question with Flink state

2022-05-24 文章 lxk7...@163.com
[URL=https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png][IMG]https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png[/IMG][/URL]
[URL=https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png][IMG]https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png[/IMG][/URL]
看下这个是否能看见图片


lxk7...@163.com
 
From: Xuyang
Date: 2022-05-24 20:17
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
Hi, 你的图还是挂了,可以使用图床工具试一下
 
 
 
在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
 
图片好像有点问题,重新上传一下
lxk7...@163.com
From: Hangxiang Yu
Date: 2022-05-24 12:09
To: user-zh
Subject: Re: Re: Some question with Flink state
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7...@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to u...@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>
> >
> > Hi everyone
> >I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >when I used value-state in multi parallelism  the value is not I
> wanted.
> >So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7...@163.com
> >
>


Re: Re: Some question with Flink state

2022-05-23 文章 lxk7...@163.com
图片好像有点问题,重新上传一下


lxk7...@163.com
 
From: Hangxiang Yu
Date: 2022-05-24 12:09
To: user-zh
Subject: Re: Re: Some question with Flink state
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
 
On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
 
> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7...@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to u...@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>
> >
> > Hi everyone
> >I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >when I used value-state in multi parallelism  the value is not I
> wanted.
> >So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7...@163.com
> >
>


Re: Re: Some question with Flink state

2022-05-23 文章 lxk7...@163.com
以下是我的代码部分




这是最新的一版,根据测试的时候没有啥问题
但是之前使用value state的时候能从数据上看出不对


lxk7...@163.com
 
From: Hangxiang Yu
Date: 2022-05-24 12:09
To: user-zh
Subject: Re: Re: Some question with Flink state
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
 
On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
 
> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7...@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to u...@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>
> >
> > Hi everyone
> >I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >when I used value-state in multi parallelism  the value is not I
> wanted.
> >So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7...@163.com
> >
>


Re: Re: Some question with Flink state

2022-05-23 文章 lxk7...@163.com
好的,我看这里面邮件都是英文,所以用英文问了个问题。
我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。



lxk7...@163.com
 
From: Hangxiang Yu
Date: 2022-05-23 23:09
To: user-zh; lxk7491
Subject: Re: Some question with Flink state
Hello,
All states will not be shared in different parallelisms.
BTW, English questions could be sent to u...@flink.apache.org.
 
Best,
Hangxiang.
 
On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
 
>
> Hi everyone
>I was used Flink keyed-state in my Project.But I found some questions
> that make me confused.
>when I used value-state in multi parallelism  the value is not I wanted.
>So I guess that value-state is in every parallelism. every parallelism
> saved their only value  which means the value is Thread-Level
>But when I used map-state,the value is correctly. I mean the map-state
> was shared by every parallelism.
>   looking forward to your reply
>
>
> lxk7...@163.com
>


Some question with Flink state

2022-05-23 文章 lxk7...@163.com

Hi everyone
   I was used Flink keyed-state in my Project.But I found some questions that 
make me confused.
   when I used value-state in multi parallelism  the value is not I wanted.
   So I guess that value-state is in every parallelism. every parallelism saved 
their only value  which means the value is Thread-Level 
   But when I used map-state,the value is correctly. I mean the map-state was 
shared by every parallelism.
  looking forward to your reply 


lxk7...@163.com


退订

2022-05-22 文章 xudongjun123...@163.com
退订



xudongjun123...@163.com


Re: Re: flink sql无法读取Hive映射的HBase表

2022-05-16 文章 18579099...@163.com
第一次弄,不知道这么写的对不对

https://issues.apache.org/jira/projects/FLINK/issues/FLINK-27604



18579099...@163.com
 
发件人: Jingsong Li
发送时间: 2022-05-13 15:06
收件人: user-zh
主题: Re: Re: flink sql无法读取Hive映射的HBase表
Hi, 推荐 https://www.deepl.com/translator
非常好用
 
我记得对Hive Custom Storage Handler(hbase)是有问题的
 
Best,
Jingsong
 
On Fri, May 13, 2022 at 2:12 PM 18579099...@163.com <18579099...@163.com>
wrote:
 
> 我英文能力不允许啊
>
>
>
> 18579099...@163.com
>
> 发件人: yuxia
> 发送时间: 2022-05-11 15:11
> 收件人: user-zh
> 主题: Re: flink sql无法读取Hive映射的HBase表
> 不好意思,我尝试复现你的问题,但是我没有 hbase 环境,不过看起来是只有当 STORED BY
>   'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 有问题?
> 我之后空了再debug 看看。
>
> 不过我看了一下 flink 这块的代码,从 flink 这块的代码来看,应该是 get 这个 hive 表之后,它的
> StorageDescriptor 的 inputformat 为 null,然后 Class.forName(inputformat) 就报错
> NPE了。
> 应该是这块代码有点问题。
> 如果你方便的话,可以辛苦帮忙建一个 jira~
> https://issues.apache.org/jira/projects/FLINK/summary
>
>
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: 18579099...@163.com
> 收件人: "user-zh" 
> 发送时间: 星期二, 2022年 5 月 10日 上午 10:39:16
> 主题: Re: Re: flink sql无法读取Hive映射的HBase表
>
> 版本:
> flink:1.13.6
> hive:2.1.1-cdh6.2.0
> hbase:2.1.0-cdh6.2.0
> flinksql执行工具:flink sql client
> sql 提交模式:yarn-per-job
>
> -
> flink lib目录下的包
> antlr-runtime-3.5.2.jar
> flink-csv-1.13.6.jar
> flink-dist_2.11-1.13.6.jar
> flink-json-1.13.6.jar
> flink-shaded-zookeeper-3.4.14.jar
> flink-sql-connector-hive-2.2.0_2.11-1.13.6.jar
> flink-table_2.11-1.13.6.jar
> flink-table-blink_2.11-1.13.6.jar
> guava-14.0.1.jar
> hadoop-mapreduce-client-core-3.0.0-cdh6.2.0.jar
> hbase-client-2.1.0-cdh6.2.0.jar
> hbase-common-2.1.0-cdh6.2.0.jar
> hbase-protocol-2.1.0-cdh6.2.0.jar
> hbase-server-2.1.0-cdh6.2.0.jar
> hive-exec-2.1.1-cdh6.2.0.jar
> hive-hbase-handler-2.1.1-cdh6.2.0.jar
> htrace-core4-4.1.0-incubating.jar
> log4j-1.2-api-2.17.1.jar
> log4j-api-2.17.1.jar
> log4j-core-2.17.1.jar
> log4j-slf4j-impl-2.17.1.jar
> protobuf-java-2.5.0.jar
>
> 
> hive建表语句
> CREATE EXTERNAL TABLE `ods.student`(
>   `row_key` string,
>   `name` string,
>   `age` int,
>   `addr` string
> )
> ROW FORMAT SERDE
>   'org.apache.hadoop.hive.hbase.HBaseSerDe'
> STORED BY
>   'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
> WITH SERDEPROPERTIES (
>
> 'hbase.columns.mapping'=':key,FINAL:NAME,FINAL:AGE,FINAL:ADDR,'serialization.format'='1')
> TBLPROPERTIES (
>   'hbase.table.name'='ODS:STUDENT') ;
> catalog:hive catalog
> sql: select * from ods.student;
> 我看了报错信息之后添加了一些jar包到flink lib下,之前报的错跟缺少依赖有关。现在又出现了新的错误。
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.connectors.hive.FlinkHiveException: Unable to instantiate
> the hadoop input format
>
> --
> 详细的堆栈
> org.apache.flink.table.client.gateway.SqlExecutionException: Could not
> execute SQL statement.
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:215)
> ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:235)
> ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:479)
> ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:412)
> ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$0(CliClient.java:327)
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_191]
> at
> org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:327)
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
> [flink-sql-client_2.11-1.13.6.jar:1.13.6]
> at o

Re: Re: flink sql无法读取Hive映射的HBase表

2022-05-12 文章 18579099...@163.com
我英文能力不允许啊



18579099...@163.com
 
发件人: yuxia
发送时间: 2022-05-11 15:11
收件人: user-zh
主题: Re: flink sql无法读取Hive映射的HBase表
不好意思,我尝试复现你的问题,但是我没有 hbase 环境,不过看起来是只有当 STORED BY 
  'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 有问题?
我之后空了再debug 看看。
 
不过我看了一下 flink 这块的代码,从 flink 这块的代码来看,应该是 get 这个 hive 表之后,它的 StorageDescriptor 的 
inputformat 为 null,然后 Class.forName(inputformat) 就报错 NPE了。
应该是这块代码有点问题。
如果你方便的话,可以辛苦帮忙建一个 jira~
https://issues.apache.org/jira/projects/FLINK/summary
 
 
 
Best regards,
Yuxia
 
- 原始邮件 -
发件人: 18579099...@163.com
收件人: "user-zh" 
发送时间: 星期二, 2022年 5 月 10日 上午 10:39:16
主题: Re: Re: flink sql无法读取Hive映射的HBase表
 
版本:
flink:1.13.6
hive:2.1.1-cdh6.2.0
hbase:2.1.0-cdh6.2.0
flinksql执行工具:flink sql client 
sql 提交模式:yarn-per-job
-
flink lib目录下的包
antlr-runtime-3.5.2.jar
flink-csv-1.13.6.jar
flink-dist_2.11-1.13.6.jar
flink-json-1.13.6.jar
flink-shaded-zookeeper-3.4.14.jar
flink-sql-connector-hive-2.2.0_2.11-1.13.6.jar
flink-table_2.11-1.13.6.jar
flink-table-blink_2.11-1.13.6.jar
guava-14.0.1.jar
hadoop-mapreduce-client-core-3.0.0-cdh6.2.0.jar
hbase-client-2.1.0-cdh6.2.0.jar
hbase-common-2.1.0-cdh6.2.0.jar
hbase-protocol-2.1.0-cdh6.2.0.jar
hbase-server-2.1.0-cdh6.2.0.jar
hive-exec-2.1.1-cdh6.2.0.jar
hive-hbase-handler-2.1.1-cdh6.2.0.jar
htrace-core4-4.1.0-incubating.jar
log4j-1.2-api-2.17.1.jar
log4j-api-2.17.1.jar
log4j-core-2.17.1.jar
log4j-slf4j-impl-2.17.1.jar
protobuf-java-2.5.0.jar

hive建表语句
CREATE EXTERNAL TABLE `ods.student`(
  `row_key` string, 
  `name` string,
  `age` int,
  `addr` string 
) 
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.hbase.HBaseSerDe' 
STORED BY 
  'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
WITH SERDEPROPERTIES ( 
  
'hbase.columns.mapping'=':key,FINAL:NAME,FINAL:AGE,FINAL:ADDR,'serialization.format'='1')
TBLPROPERTIES (
  'hbase.table.name'='ODS:STUDENT') ;
catalog:hive catalog 
sql: select * from ods.student;
我看了报错信息之后添加了一些jar包到flink lib下,之前报的错跟缺少依赖有关。现在又出现了新的错误。
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.connectors.hive.FlinkHiveException: Unable to instantiate the 
hadoop input format
--
详细的堆栈
org.apache.flink.table.client.gateway.SqlExecutionException: Could not execute 
SQL statement.
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:215)
 ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:235)
 ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
at org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:479) 
~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:412) 
~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$0(CliClient.java:327)
 [flink-sql-client_2.11-1.13.6.jar:1.13.6]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_191]
at 
org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:327)
 [flink-sql-client_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
 [flink-sql-client_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
 [flink-sql-client_2.11-1.13.6.jar:1.13.6]
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) 
[flink-sql-client_2.11-1.13.6.jar:1.13.6]
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) 
[flink-sql-client_2.11-1.13.6.jar:1.13.6]
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) 
[flink-sql-client_2.11-1.13.6.jar:1.13.6]
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) 
[flink-sql-client_2.11-1.13.6.jar:1.13.6]
Caused by: org.apache.flink.connectors.hive.FlinkHiveException: Unable to 
instantiate the hadoop input format
at 
org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createMRSplits(HiveSourceFileEnumerator.java:100)
 ~[flink-sql-connector-hive-2.2.0_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.connectors.hive.HiveSourceFileEnumerator.createInputSplits(HiveSourceFileEnumerator.java:71)
 ~[flink-sql-connector-hive-2.2.0_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.connectors.hive.HiveTableSource.lambda$getDataStream$1(HiveTableSource.java:212)
 ~[flink-sql-connector-hive-2.2.0_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.conn

Re: Re: flink sql无法读取Hive映射的HBase表

2022-05-09 文章 18579099...@163.com
al(StreamExecSink.java:114)
 ~[flink-table-blink_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
 ~[flink-table-blink_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:70)
 ~[flink-table-blink_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:69)
 ~[flink-table-blink_2.11-1.13.6.jar:1.13.6]
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[flink-dist_2.11-1.13.6.jar:1.13.6]
at scala.collection.Iterator$class.foreach(Iterator.scala:891) 
~[flink-dist_2.11-1.13.6.jar:1.13.6]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 
~[flink-dist_2.11-1.13.6.jar:1.13.6]
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
~[flink-dist_2.11-1.13.6.jar:1.13.6]
at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
~[flink-dist_2.11-1.13.6.jar:1.13.6]
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
~[flink-dist_2.11-1.13.6.jar:1.13.6]
at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
~[flink-dist_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69)
 ~[flink-table-blink_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
 ~[flink-table-blink_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
 ~[flink-table_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:791)
 ~[flink-table_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1225)
 ~[flink-table_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeOperation$3(LocalExecutor.java:213)
 ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90)
 ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:213)
 ~[flink-sql-client_2.11-1.13.6.jar:1.13.6]
... 12 more







18579099...@163.com
 
发件人: yuxia
发送时间: 2022-05-10 09:32
收件人: user-zh
主题: Re: flink sql无法读取Hive映射的HBase表
用的是 Hive Catalog 吗? Hive connector 和 Hive 的版本 都是多少呢?
另外,详细堆栈贴一下。
 
Best regards,
Yuxia
 
- 原始邮件 -
发件人: 18579099...@163.com
收件人: "user-zh" 
发送时间: 星期一, 2022年 5 月 09日 下午 5:46:02
主题: flink sql无法读取Hive映射的HBase表
 
我有一部分表的数据是存在hbase上的,平时通过hive加载外部表的方式读取hbase的数据,我想通过flink sql读取hive表的方式
读取数据(不直接使用flink 读取hbase是我使用的catalog是hive,不用再写建表语句然后再查),当我用sql-client尝试的时候报错。
读取正常的hive是可以正常读取的,但是读取hive on hbase表却报
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.catalog.exceptions.CatalogException: Failed to get table 
schema from deserializer。
不知道有没有什么办法可以解决这个问题,使用spark引擎是可以读取到数据的。
 
 
 
 
18579099...@163.com


flink sql无法读取Hive映射的HBase表

2022-05-09 文章 18579099...@163.com
我有一部分表的数据是存在hbase上的,平时通过hive加载外部表的方式读取hbase的数据,我想通过flink sql读取hive表的方式
读取数据(不直接使用flink 读取hbase是我使用的catalog是hive,不用再写建表语句然后再查),当我用sql-client尝试的时候报错。
读取正常的hive是可以正常读取的,但是读取hive on hbase表却报
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.catalog.exceptions.CatalogException: Failed to get table 
schema from deserializer。
不知道有没有什么办法可以解决这个问题,使用spark引擎是可以读取到数据的。




18579099...@163.com


【Could we support distribute by For FlinkSql】

2022-05-08 文章 lpengdr...@163.com
Hello:
Now we cann't add a shuffle-operation in a sql-job.
Sometimes , for example, I have a kafka-source(three partitions) with 
parallelism three. And then I have a lookup-join function, I want process the 
data distribute by id so that the data can split into thre parallelism evenly 
(The source maybe slant seriously).
In DataStream API i can do it with keyby(), but it's so sad that i can do 
nothing when i use a sql;
Maybe we can do it like 'select id, f1,f2 from sourceTable distribute by id' 
like we do it in SparkSql.

Sot that we can make change on the picture  in sql-mode;

    



lpengdr...@163.com


web ui中能查看到job失败的原因吗?

2022-04-21 文章 weishishuo...@163.com

我提交一个postgresql cdc 同步数据到 mysql jdbc sink的job,过了一会儿就失败了,点击job的链接,web 
ui界面的状态是FAILED,但是异常信息不明确
```
2022-04-21 17:30:50
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
...
Caused by: org.apache.flink.util.FlinkException: Execution 
d0dfc8446e24da751e93560c07f5d7f3 is unexpectedly no longer running on task 
executor 192.168.211.4:9730-fa3d22.
at 
org.apache.flink.runtime.jobmaster.JobMaster$1.onMissingDeploymentsOf(JobMaster.java:248)
... 34 more
```
不知道root cause是什么,web 界面是否可以查到呢?还是到哪里去查看呢?




weishishuo...@163.com


flink-connector和flink-sql-connector的区别

2022-04-21 文章 weishishuo...@163.com
cdc项目中每种connector都分成flink-connector-xxx和flink-sql-connector-xxx,比如flink-connector-mysql-cdc和flink-sql-connector-mysql-cdc,这两个的区别是什么呢?在什么场景下用前者,什么场景下用后者?




weishishuo...@163.com


回复: flink发布集群端运行,如何在sink或source中使用springboot的bean

2022-04-18 文章 wch...@163.com
如果需要在flink使用spring的话, 需要在open方法加载applicationContext对象. 
你这里需要在sink的open方法初始化spring上下文对象. 

  override def open(conf: Configuration): Unit = {
super.open(conf)
if (Option(SpringContextHolder.getApplicationContext).isEmpty) {
  SpringContextHolder.startupApplicationContext(SelectStockJob.getClass, 
profiles)
}
repository = SpringContextHolder.getBean(classOf[xxxRepository])
  }



wch...@163.com
 
发件人: 676360...@qq.com.INVALID
发送时间: 2022-04-18 14:28
收件人: user-zh@flink.apache.org
主题: flink发布集群端运行,如何在sink或source中使用springboot的bean
您好:
首先很感谢您能在百忙之中看到我的邮件。在使用flink框架过程中我遇到了一些问题,希望能得到您的解答。

我通过网上已有的资料进行学习,在本地环境将springboot框架与flink进行结合,并可以成功运行。但是当我将项目通过maven打包成jar包后,发布到flink集群端时,在自定义sink和source类中无法获取到springboot的ApplicationContext,所以我想问下针对此情况是否有解决方案。
下面是我代码的具体实现思路:
1.通过实现Springboot的CommandLineRunner的run方法来起到等同于main方法的作用
@Component
@Slf4j
public class InitRunner implements CommandLineRunner {
 
@Autowired
private Constant constant;
 
@Override
public void run(String... args) throws Exception {
//初始化启动
log.error("初始化启动");
start();
}
 
private void start() throws Exception {
String actives = constant.getActive();
if (StrUtil.isNotBlank(actives)){
String[] active = actives.split(",");
for (String act : active) {
Class cls = 
Class.forName(constant.getProperty("streaming.tasks." + act + ".package"));
AbstractStreamingTask streamingTask = (AbstractStreamingTask) 
cls
.getConstructor(new 
Class[]{String.class,Constant.class})
.newInstance(new Object[]{act, constant});
System.err.println("1---");
//此方法为flink调用
streamingTask.streaming();
}
}
}
}

2.自定义类实现ApplicationContextAware接口,获得ApplicationContext的值,并用static修饰,准备在自定义sink和source类中通过该值获得相关的bean
 
@Component
@Slf4j
public class SpringApplicationContext implements ApplicationContextAware, 
Serializable {
 
private static ApplicationContext applicationContext;
 
@Override
public void setApplicationContext(ApplicationContext applicationContext) 
throws BeansException {
log.error("context先执行");
if (this.applicationContext == null) {
synchronized (SpringApplicationContext.class){
if (this.applicationContext == null){
this.applicationContext = applicationContext;
}
}
}
}
 
 
public static Object getBean(String name) {
log.error("获取bean:"+name);
return applicationContext.getBean(name);
}
}
3.在自定义sink类中,通过ApplicationContext获取我需要的bean
 
@Slf4j
public class MysqlSink extends RichSinkFunction implements 
BaseSink, Serializable {
 
private BaseService baseService;
 
private Constant constant;
 
private String active;
 
public MysqlSink(String active) {
this.active = active;
}
 
@Override
public void open(Configuration parameters) throws Exception {
log.info("--open mysqlSink");
super.open(parameters);
init();
}
 
@Override
public void invoke(Test value, Context context) throws Exception {
log.info("--invoke mysqlSink");
if (value != null){
baseService.operate(value);
}
}
 
@Override
public SinkFunction[] getSinkFunction() {
return new SinkFunction[]{this};
}
 
private void init(){
if (constant == null){
constant = (Constant) SpringApplicationContext.getBean("constant");
}
if (baseService == null){
baseService = (BaseService) 
SpringApplicationContext.getBean(constant.getProperty("streaming.tasks."+active+".sink.mysql.service"));
}
}
}

经过我的测试,在发布到flink集群端并启动jar包时,applicationContext是可以获得正常的值得,但是在sink类中,值变为null。针对此种情况希望能从您那里获得相应解决方案,十分感谢。
 
 
676360...@qq.com


使用问题咨询:Postgres 数据库作为 Catalog 时如何设置一些其他参数

2022-03-22 文章 17610801...@163.com

使用 Postgres 数据库作为 Catalog 
时如何设置一些其他参数,例如sink.buffer-flush.interval,sink.buffer-flush.max-rows

17610801...@163.com


Re: 实时数据入库怎样过滤中间状态,保证最终一致

2022-02-28 文章 18703416...@163.com
首先确定 source 事件有 eventTime ,比如 source 的返回类型为 MySource
示例代码如下:
static class MySource {
Long ts;
String key;
Object object;
}
env.addSource(new SourceFunction() {
@Override
public void run(SourceContext ctx) throws Exception {
ctx.collect(new MySource());
}
@Override
public void cancel() {
}
}).keyBy(new KeySelector() {
@Override
public String getKey(MySource value) throws Exception {
return value.key;
}
}).timeWindow(Time.seconds(10)).process(new ProcessWindowFunction() {
@Override
public void process(String s, Context context, Iterable elements, 
Collector out) throws Exception {
List collect = 
Lists.newArrayList(elements).stream().sorted(new Comparator() {
@Override
public int compare(MySource o1, MySource o2) {
return o2.ts.compareTo(o1.ts);
}
}).collect(Collectors.toList());
if (collect.size() > 0){
out.collect(collect.get(0).object);
}
}
}).addSink(new SinkFunction() {
@Override
public void invoke(Object value, Context context) throws Exception {
System.out.println(value);
}
});





> 2022年3月1日 上午11:35,Lei Wang  写道:
> 
> 谢谢,这种是可以。
> 
> 取窗口内最新的数据怎么写合适呢,我直接这样试了下不符合预期:
> 
> env.addSource(consumer).keyBy(new KeySelector() {
>@Override
>public String getKey(String value) throws Exception {
>return value;
>}
> }).timeWindow(Time.seconds(10)).reduce((a,b)->b).addSink()
> 
> 实际上逆序输出了窗口内的所有记录。
> 
> 谢谢,
> 
> 王磊
> 
> 
> 
> On Mon, Feb 28, 2022 at 9:59 AM 18703416...@163.com <18703416...@163.com>
> wrote:
> 
>> keyBy 算子之后接 timewindow 窗口, 每个窗口如果有多条数据就取最新的一条。 至于对数据库的压力,取决于这个窗口的大小
>> 
>>> 2022年2月25日 下午6:45,Lei Wang  写道:
>>> 
>>> 场景描述:
>>> Kafka 中的数据直接入到 MySQL 数据库中,数据格式如下:
>>> order_id   status
>>> 只有两个字段, order_id 为主键,以 replace 覆盖方式写入到数据库中。
>>> 
>>> 对同一个 order_id, status 变化很频繁,为不对数据库造成压力,不会对每一条记录都做入库操作,但一定要保证这个 order_id
>>> 最终的状态不丢,但这个最终的状态也不确定是多少。
>>> 
>>> 我的做法是 KeyBy  orderId 后判断两条记录的时间间隔,如果时间间隔太小不做入库操作,但如果这个 order_id
>>> 最后来的两条记录时间间隔太小,会导致最终的状态丢失。
>>> 
>>> 请问有什么其他的解决方法吗?
>>> 
>>> 谢谢,
>>> 王磊
>> 
>> 



Re: 实时数据入库怎样过滤中间状态,保证最终一致

2022-02-27 文章 18703416...@163.com
keyBy 算子之后接 timewindow 窗口, 每个窗口如果有多条数据就取最新的一条。 至于对数据库的压力,取决于这个窗口的大小

> 2022年2月25日 下午6:45,Lei Wang  写道:
> 
> 场景描述:
> Kafka 中的数据直接入到 MySQL 数据库中,数据格式如下:
> order_id   status
> 只有两个字段, order_id 为主键,以 replace 覆盖方式写入到数据库中。
> 
> 对同一个 order_id, status 变化很频繁,为不对数据库造成压力,不会对每一条记录都做入库操作,但一定要保证这个 order_id
> 最终的状态不丢,但这个最终的状态也不确定是多少。
> 
> 我的做法是 KeyBy  orderId 后判断两条记录的时间间隔,如果时间间隔太小不做入库操作,但如果这个 order_id
> 最后来的两条记录时间间隔太小,会导致最终的状态丢失。
> 
> 请问有什么其他的解决方法吗?
> 
> 谢谢,
> 王磊



Re: 如何按比例丢弃kafka中消费的数据

2022-02-27 文章 18703416...@163.com
自定义 kafkasource 的 DeserializationSchema
丢弃的返回 null, source 的下一个filter 算子进行过滤即可

> 2022年2月26日 上午9:01,jack zhang  写道:
> 
> 1、flink程序资源有限,kafka中数据比较多,想要按一定比例丢弃数据(或者其它策略),减轻flink 程序压力,有什么方法吗?



Re: flink状态共享

2022-02-25 文章 18703416...@163.com
如果不同算子 需要共享状态,是否考虑 归为一个算子进行处理,同理后面的算子也是

> 2022年2月25日 下午4:30,huangzhi...@iwgame.com 写道:
> 
> 对于keyed datastream 不同的算子之间是否能够共享同一状态,或者后面的算子任务,是否可以拿到前一个算子任务中的状态?
> 
> 
> 
> huangzhi...@iwgame.com



Re: 如何给flink的输出削峰填谷?

2022-01-27 文章 18703416...@163.com
类似kafka这样的消息管道应该用来 削峰填谷,
可以先sink 至kafka,再从kafka -> db

> 2022年1月26日 上午2:11,Jing  写道:
> 
> Hi Flink中文社区,
> 
> 我碰到一个这样的问题,我的数据库有write throttle, 我的flink
> app是一个10分钟窗口的聚合操作,这样导致,每10分钟有个非常大量的写请求。导致数据库的sink有时候会destroy.
> 有什么办法把这些写请求均匀分布到10分钟吗?
> 
> 
> 谢谢,
> Jing



退订

2022-01-10 文章 xingb...@163.com
退订




Re: 关于时间窗口的问题

2022-01-09 文章 18703416...@163.com
你好,我理解你的意思了。
可以看下 flink-cep 相关内容, 利用模式匹配去实现

> 2022年1月8日 下午7:10,18765295...@163.com 写道:
> 
> 您好:
> 请教一个问题,
> 例如:开启一个5秒钟的滚动窗口,当key001的两条数据进来时,没有满足时间触发,但是当key002的数据进来满足窗口触发条件,会将key001的两条数据输出出去。
> 
> 我想实现的是一个基于事件时间设置的滚动窗口,当key001的数据到来时,没有满足时间时,不会因为key002的数据到来触发key001的数据进行输出。
> 每个key都有一个属于自己的时间窗口,不会受其他分组key的影响,并且可以为每个key的时间窗口设置一个基于数量和时间的触发器,当满足数量时触发或者时间到了触发。
> 
> 经过测试发现,现在设置的时间窗口里面会有不同key的数据在一起
> 每个分组是否有属于自己的时间窗口。
> 
> 
> 数量窗口的逻辑是每个key都有一个属于自己key的数量窗口,
> 例如:设置一个数量为3的滚动窗口,输入1,2,3,4,不会触发窗口执行,但是继续输入两条1的数据,会输出三个1的数据。
> 
> 请问时间窗口可以实现类似数量窗口这样的逻辑吗。
> 
> public class Test {
>public static void main(String[] args) throws Exception {
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>env.setParallelism(1);
>DataStreamSource dataSource = 
> env.socketTextStream("localhost", 7788);
>SingleOutputStreamOperator map = dataSource.map(new 
> MapFunction() {
>@Override
>public OrderItem map(String s) throws Exception {
>String[] split = s.split(",");
>return new OrderItem(split[0].trim(), 
> Double.parseDouble(split[1].trim()), Long.parseLong(split[2].trim()));
>}
>});
> 
>// 时间窗口测试代码
>SingleOutputStreamOperator warter = 
> map.assignTimestampsAndWatermarks(
>
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0))
>.withTimestampAssigner((event, timestamp) -> 
> event.getTimeStamp()));
>SingleOutputStreamOperator timeWindow = warter.keyBy(data -> 
> data.getOrderId())
>.window(TumblingEventTimeWindows.of(Time.seconds(5)))
>.process(new ProcessWindowFunction TimeWindow>() {
>SimpleDateFormat sdf = new SimpleDateFormat("-MM-dd 
> HH:mm:ss");
> 
>@Override
>public void process(String key,
>ProcessWindowFunction String, String, TimeWindow>.Context context,
>Iterable iterable,
>Collector collector) throws 
> Exception {
>Iterator iterator = iterable.iterator();
>StringBuilder sb = new StringBuilder();
>sb.append("key -> " + key + "窗口开始时间:" + sdf.format(new 
> Date(context.window().getStart())) + "\t\n");
>while (iterator.hasNext()) {
>OrderItem next = iterator.next();
>sb.append(next + "\t\n");
>}
>sb.append("窗口结束时间:" + sdf.format(new 
> Date(context.window().getEnd(;
>collector.collect(sb.toString());
>}
>});
> 
>timeWindow.print();
>env.execute();
> 
>}
> }



关于时间窗口的问题

2022-01-08 文章 18765295...@163.com
您好:
请教一个问题,
例如:开启一个5秒钟的滚动窗口,当key001的两条数据进来时,没有满足时间触发,但是当key002的数据进来满足窗口触发条件,会将key001的两条数据输出出去。

我想实现的是一个基于事件时间设置的滚动窗口,当key001的数据到来时,没有满足时间时,不会因为key002的数据到来触发key001的数据进行输出。
每个key都有一个属于自己的时间窗口,不会受其他分组key的影响,并且可以为每个key的时间窗口设置一个基于数量和时间的触发器,当满足数量时触发或者时间到了触发。

经过测试发现,现在设置的时间窗口里面会有不同key的数据在一起
每个分组是否有属于自己的时间窗口。


数量窗口的逻辑是每个key都有一个属于自己key的数量窗口,
例如:设置一个数量为3的滚动窗口,输入1,2,3,4,不会触发窗口执行,但是继续输入两条1的数据,会输出三个1的数据。

请问时间窗口可以实现类似数量窗口这样的逻辑吗。

public class Test {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource dataSource = env.socketTextStream("localhost", 
7788);
SingleOutputStreamOperator map = dataSource.map(new 
MapFunction() {
@Override
public OrderItem map(String s) throws Exception {
String[] split = s.split(",");
return new OrderItem(split[0].trim(), 
Double.parseDouble(split[1].trim()), Long.parseLong(split[2].trim()));
}
});

// 时间窗口测试代码
SingleOutputStreamOperator warter = 
map.assignTimestampsAndWatermarks(

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((event, timestamp) -> 
event.getTimeStamp()));
SingleOutputStreamOperator timeWindow = warter.keyBy(data -> 
data.getOrderId())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction() {
SimpleDateFormat sdf = new SimpleDateFormat("-MM-dd 
HH:mm:ss");

@Override
public void process(String key,
ProcessWindowFunction.Context context,
Iterable iterable,
Collector collector) throws 
Exception {
Iterator iterator = iterable.iterator();
StringBuilder sb = new StringBuilder();
sb.append("key -> " + key + "窗口开始时间:" + sdf.format(new 
Date(context.window().getStart())) + "\t\n");
while (iterator.hasNext()) {
OrderItem next = iterator.next();
sb.append(next + "\t\n");
}
sb.append("窗口结束时间:" + sdf.format(new 
Date(context.window().getEnd(;
collector.collect(sb.toString());
}
});

timeWindow.print();
env.execute();

}
}


【Log4j.properties使用AsyncAppender】

2021-12-16 文章 lpengdr...@163.com
Hi:

flink使用log4j.properties配置,.properties配置似乎没有办法配置AsyncAppender,所以在flink的日志没有办法使用AsyncAppender了吗?是否有其他的办法可以绕行



lpengdr...@163.com


回复: Re: flink sql支持细粒度的状态配置

2021-12-09 文章 gygz...@163.com
Hi Yun Tang

感谢你的回复,我们在调研的过程中也发现,正如你所说的生成的plan可能差异很大

但是每个operator的TTL生效时间是在execNode转换成对应的Transformation时,通过传入的StreamPlanner带进去的,TableConfig属性中包含了全局的TTL时间

在每个ExecNode转换的过程translateToPlanInternal((PlannerBase) 
planner)中使用这个TTL时间生成对应的operator

所以我们在考虑是否可以在,每个Transformation生成阶段,先去修改一下TableConfig中TTL的配置再调用每个execNode转换成operator的方法,来做到Transformation级别的TTL控制,这个配置开放给平台的用户,通过Transformation的id做识别,是否能给一些建议




gygz...@163.com
 
发件人: Yun Tang
发送时间: 2021-12-09 10:57
收件人: user-zh
主题: Re: flink sql支持细粒度的状态配置
Hi 你好,
 
我认为这是一个很好的需求,对于data stream以及python API来说,state 
TTL都是通过API逐个配置的,你的需求就可以直接满足。但是对于SQL来说,由于相同的SQL语句,不同优化器其生成的执行plan可能会差异很大,很难对某个operator内的state进行TTL进行配置,可能一种方式是增加一些SQL的优化hint,对于你示例中的join语句和groupBy
 的count语句配以不同的TTL,但是目前Flink SQL尚未支持该功能。
 
 
祝好
唐云
 

From: gygz...@163.com 
Sent: Tuesday, December 7, 2021 18:38
To: user-zh 
Subject: flink sql支持细粒度的状态配置
 
Hi all
 
在我们生产中发现,如果在sql中配置状态的TTL会导致这个 ttl时间全局生效
 
如果我存在一个如下sql
 
select count(1),region from (select * from A join B on a.uid = b.uid)  group by 
region
 
如果我配置一个全局的TTL会导致count这个GroupAggFunction的状态被淘汰掉,比如说一天以后累计就被清零
 
如果不配置,又会导致Regular join的状态增大
 
这是其中一个场景,这里只是举一个例子
 
主要是想询问针对 Sql中需要配置局部State的ttl时间,或者同一个任务每个sql配置不同的TTL时间,这种场景应该如何去做 ?
 
 
 
gygz...@163.com


flink sql支持细粒度的状态配置

2021-12-07 文章 gygz...@163.com
Hi all

在我们生产中发现,如果在sql中配置状态的TTL会导致这个 ttl时间全局生效

如果我存在一个如下sql

select count(1),region from (select * from A join B on a.uid = b.uid)  group by 
region

如果我配置一个全局的TTL会导致count这个GroupAggFunction的状态被淘汰掉,比如说一天以后累计就被清零

如果不配置,又会导致Regular join的状态增大

这是其中一个场景,这里只是举一个例子

主要是想询问针对 Sql中需要配置局部State的ttl时间,或者同一个任务每个sql配置不同的TTL时间,这种场景应该如何去做 ?



gygz...@163.com


flink sql支持state TTL的细粒度局部配置

2021-11-30 文章 gygz...@163.com
Hi all

在我们生产中发现,如果在sql中配置状态的TTL会导致这个 ttl时间全局生效

如果我存在一个如下sql

select count(1),region from (select * from A join B on a.uid = b.uid)  group by 
region

如果我配置一个全局的TTL会导致count这个GroupAggFunction的状态被淘汰掉,比如说一天以后累计就被清零

如果不配置,又会导致Regular join的状态增大

这是其中一个场景,这里只是举一个例子

主要是想表达针对 Sql中需要配置局部State的ttl时间,这种场景应该如何去做 ?



gygz...@163.com

 
  


flink sql建表异常问题

2021-11-16 文章 18765295...@163.com
您好 
  我将flink 
sql建表程序提交到k8s执行时抛出如下异常,网上提示需要引入flink-table-planner-blink等依赖,但实际pom文件中已经引入,请问这个该如何解决,附部分pom文件,flink版本为1.13。谢谢。


取消订阅

2021-11-07 文章 tanggen...@163.com
取消订阅


tanggen...@163.com


回复:Flink Sql读取Hbase表

2021-11-07 文章 zst...@163.com
作为读的数据源时,使用的hbase的sdk 
scanner读取,不是全量读。见org.apache.flink.connector.hbase2.source.AbstractTableInputFormat#nextRecord。


作为维表时,使用Guava 
cache缓存每次join到的key。见org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction#eval。


Best Wishes!
- Yuan
在2021年11月7日 16:26,guanyq 写道:
请大佬指导下:

-- 在 Flink SQL 中注册 HBase 表 "mytable"
CREATETABLEhTable(rowkeyINT,family1ROW,family2ROW,family3ROW,PRIMARYKEY(rowkey)NOTENFORCED)WITH('connector'='hbase-1.4','table-name'='mytable','zookeeper.quorum'='localhost:2181');
Flink sql在读取hbase表时,是一次将数据加载到内存还是每次加载一批数据呀?
其实就是想知道,如果hbase表数据量特别大的时候,Flink sql是如何处理的?





公司数据密文,实现group by和join

2021-10-27 文章 lyh1067341...@163.com
您好:
目前公司数据都是密文,要进行密文数据的比较或者计算的话,只能调用公司密文计算的接口,去看了下flink的分组和join算子,都只能指定分组的key或者join的key,不知道怎么改写比较的规则,我用mapreduce实现了重写shuffle的比较规则,可以实现密文下的join和group
 by,对于使用spark和flink算子不知道如何实现。

问题:
请问有啥办法,实现密文下的join和group by操作吗?(在不能解密,只能调用公司密文计算的接口)

谢谢您。



发自 网易邮箱大师

退订

2021-10-20 文章 aegean0...@163.com


| |
aegean0933
邮箱:aegean0...@163.com
|
退订

退订

2021-10-14 文章 tanggen...@163.com

退订


tanggen...@163.com


退订

2021-10-14 文章 tanggen...@163.com
退订



tanggen...@163.com


Flink Stream + StreamTableEnvironment 结合使用时checkpoint异常问题

2021-09-10 文章 xia_...@163.com
Hi:
有个问题想请教一下大佬们:正在研究流上join操作,使用FlinkKafkaConsume 
消费kafka数据作为数据源,随后关联hbase维度表数据,可以成功关联,但是KafkaSource缺始终没有进行checkpoint,代码中是有设置checkpint的,我想请问一下是需要做其他什么配置吗?代码如下

DataStream kafkaSource = env.addSource(source);
Map> sideOutStreamMap = new HashMap<>();
for (RowToColumnBean bean : lists) {
OutputTag app = new OutputTag(bean.getMainTable()) {
};
sideOutStreamMap.put(bean.getMainTable(), app);
}

RowToNumberProcessFunction rowToNumberProcessFunction = new 
RowToNumberProcessFunction(sideOutStreamMap, lists);
SingleOutputStreamOperator process = 
kafkaSource.process(rowToNumberProcessFunction);

EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env, 
settings, new TableConfig());
//设置checkpoint
tableEnv.getConfig().getConfiguration().setString("execution.checkpointing.interval",
 "10 s");

for (RowToColumnBean bean : lists) {
DataStream dataStream = 
process.getSideOutput(sideOutStreamMap.get(bean.getMainTable()));

String mainTable = bean.getMainTable().split(" 
")[0].split("\\.")[1].toLowerCase();

//Table tmpTable = tableEnv.fromDataStream(dataStream, 
StrUtil.list2Str(bean.getQueryColumns()));

tableEnv.createTemporaryView(mainTable, dataStream);

String joinTable = mainTable + "_join";
tableEnv.executeSql("CREATE TABLE " + joinTable + "(\n" +
"rowkey STRING,\n" +
"info ROW,\n" +
"PRIMARY KEY (rowkey) NOT ENFORCED\n" +
") WITH (\n" +
"'connector' = 'hbase-2.2',\n" +
"'table-name' = 'hid0101_cache_his_dhcapp_nemrforms:dformfiled',\n" 
+
"'zookeeper.quorum' = '192.168.0.115:2181',\n" +
"'zookeeper.znode.parent' = '/hbase'\n" +
")");


//查询数据
//Table table = tableEnv.sqlQuery("select b.* from tmp a left join  
dformfiled b on a.key = b.rowkey");
Table table = tableEnv.sqlQuery("select a.*,b.* from " + mainTable + " a 
left join " + joinTable + " b on a.key = lower(b.rowkey) and b.formid='550' 
where b.rowkey is not null");

TableSchema schema = table.getSchema();
schema.getTableColumns().forEach(column -> {

System.err.println(column.asSummaryString());
});

DataStream> tuple2DataStream = 
tableEnv.toRetractStream(table, Row.class);
tuple2DataStream.print(mainTable);
dataStream.print(mainTable);
}


xia_...@163.com


Flink OperatorChain AsyncWaitOperator

2021-09-01 文章 lpengdr...@163.com
Hi:

请教下Flink的operator-chain的机制, 
我看AsyncWaitOperator的chainingStrategy是ALWAYS,但是实际使用发现在所有算子并行度都设置为1的情况下这个operator没有办法和上游算子chain串起来只能跟下游的算子slot共享,这里边是还有什么其他规则吗?





退订

2021-08-24 文章 aegean0...@163.com
退订

| |
aegean0933
邮箱:aegean0...@163.com
|

Re: Re: filesystem connector不支持跨subtask合并小文件

2021-08-04 文章 lixin58...@163.com
你好,
生成的三个文件挺小的,不到2kb,1k多一点,配这个是为了合并后比2k大



lixin58...@163.com
 
发件人: Rui Li
发送时间: 2021-08-05 11:42
收件人: user-zh
主题: Re: filesystem connector不支持跨subtask合并小文件
你好,
 
看到你的compaction.file-size配置成了2kb,这个是希望合并以后的文件的target size只有2kb么
 
On Wed, Aug 4, 2021 at 5:39 PM lixin58...@163.com 
wrote:
 
> 你好,
> 在使用filesystem
> connector过程中,开启了compaction,使用parquet列式文件,指定3个并行度,但发现无论如何也触发不了合并,因为列式文件是checkpoint触发时才会滚动,这样同一checkpoint内会产生与并行度相同的文件,按说此时文件数已经大于1了,为什么不合并呢?
>
> create table fs_parquet_compact
> (userid bigint, name string, part string)
> PARTITIONED BY (part)
> with(
> 'connector' = 'filesystem',
> 'path' = 'hdfs:///data/fs_parquet_compact',
> 'format' = 'parquet',
> 'auto-compaction' = 'true',
> 'compaction.file-size' = '2kb',
> 'sink.rolling-policy.file-size' = '500b',
> 'sink.rolling-policy.rollover-interval' = '800s',
> 'sink.rolling-policy.check-interval' = '60s'
> );
>
>
>
> lixin58...@163.com
>
 
 
-- 
Best regards!
Rui Li


filesystem connector不支持跨subtask合并小文件

2021-08-04 文章 lixin58...@163.com
你好,
在使用filesystem 
connector过程中,开启了compaction,使用parquet列式文件,指定3个并行度,但发现无论如何也触发不了合并,因为列式文件是checkpoint触发时才会滚动,这样同一checkpoint内会产生与并行度相同的文件,按说此时文件数已经大于1了,为什么不合并呢?

create table fs_parquet_compact
(userid bigint, name string, part string)
PARTITIONED BY (part)
with(
'connector' = 'filesystem',
'path' = 'hdfs:///data/fs_parquet_compact',
'format' = 'parquet',
'auto-compaction' = 'true',
'compaction.file-size' = '2kb',
'sink.rolling-policy.file-size' = '500b',
'sink.rolling-policy.rollover-interval' = '800s',
'sink.rolling-policy.check-interval' = '60s'
);



lixin58...@163.com


Re: Re: filesystem table parquet 滚动问题

2021-07-26 文章 lixin58...@163.com
你好,

感谢回复,看了下这个文档,提到对于parquet这种列式文件只会使用onCheckpointRollPolicy,也就是只在做检查点时会滚动。flink 
filesystem table这块的parquet列式文件写入是不是也这样呢?
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/streamfile_sink.html
 




lixin58...@163.com
 
发件人: Jingsong Li
发送时间: 2021-07-27 10:30
收件人: user-zh
主题: Re: filesystem table parquet 滚动问题
parquet因为它会在内存中攒buffer,所以文件的file-size并不能很精确。。只能等它flush了才会生效。
 
On Sun, Jul 25, 2021 at 9:47 AM lixin58...@163.com 
wrote:
 
> 大家好,
>
> 检查点配的是120s,滚动时长800s,滚动大小1kb,并行度配的2
>
>
> 不过在跑的过程中发现不管写入的多快,同时只存在一个in-progress文件,且最终生成的文件是严格按照120s生成的,这个很奇怪,似乎只有按检查点滚动生效了,与json格式的不一样。真的是这样吗?不过看官方文档没有这样说
>
> 求大佬们解惑!
>
> create table fs_parquet
> (userid bigint, name string, part string)
> PARTITIONED BY (part)
> with(
> 'connector' = 'filesystem',
> 'path' = 'hdfs:///data/fs_parquet',
> 'format' = 'parquet',
> 'sink.rolling-policy.file-size' = '1kb',
> 'sink.rolling-policy.rollover-interval' = '800s',
> 'sink.rolling-policy.check-interval' = '60s'
> );
>
>
>
>
 
-- 
Best, Jingsong Lee


filesystem table parquet 滚动问题

2021-07-24 文章 lixin58...@163.com

大家好,

检查点配的是120s,滚动时长800s,滚动大小1kb,并行度配的2

不过在跑的过程中发现不管写入的多快,同时只存在一个in-progress文件,且最终生成的文件是严格按照120s生成的,这个很奇怪,似乎只有按检查点滚动生效了,与json格式的不一样。真的是这样吗?不过看官方文档没有这样说

求大佬们解惑!

create table fs_parquet
(userid bigint, name string, part string)
PARTITIONED BY (part)
with(
'connector' = 'filesystem',
'path' = 'hdfs:///data/fs_parquet',
'format' = 'parquet',
'sink.rolling-policy.file-size' = '1kb',
'sink.rolling-policy.rollover-interval' = '800s',
'sink.rolling-policy.check-interval' = '60s'
);





flink时态表:两个Hbase左关联有报错情况

2021-07-13 文章 xie_guo...@163.com
您好,有关flinkSQL时态表左关联时遇到了问题。
具体场景:

两个Hbase表做时态表关联,左表定义了处理时间,右表原本不变(rowkey,cf),FLINK版本1.13.0,报错情况如下,麻烦问一下大家又遇到类似问题嘛,该怎么处理!

2021-07-14 09:22:20.592 WARN  org.apache.flink.runtime.taskmanager.Task  --- 
2021-07-14 09:22:20.596 WARN  org.apache.flink.runtime.taskmanager.Task  --- 
LookupJoin(table=[default_catalog.default_database.hbase_source_pollution_dwb_enterprise_wodhz],
 joinType=[LeftOuterJoin], async=[true], lookup=[code=$f4], select=[code, 
data1, data2, p, $f4, code0, data]) -> Calc(select=[code, ROW(,,data.activ) 
-> NotNullEnforcer(fields=[EXPR$1, EXPR$2, p, EXPR$4]) (3/3)#3 
(4ada188e117c67ccd9bd6488ae95216a) switched from RUNNING to FAILED with failure 
cause: java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: 
org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:168)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:131)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:448)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:671)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:629)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError: 
org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils
at 
org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.AsyncConnectionImpl.close(AsyncConnectionImpl.java:193)
at 
org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction.close(HBaseRowDataAsyncLookupFunction.java:251)
at LookupFunction$3.close(Unknown Source

ps:同样的代码,左表换成Kafka,能够正常运行。网上搜索了一下相关报错,好像时缺包,目前pom文件是有hbase-client、hbase-commen、flink-sql-connector-hbase。



Sincerely,
xie_guo...@163.com


????: Processing-time temporal join is not supported yet

2021-06-23 文章 jiangshan0...@163.com
 Join  
?? Join ?? 
?? Join ?? Join  Join 
??  join 
??,
 ??

 
watermark



jiangshan0...@163.com
 
 op
?? 2021-06-23 17:03
 user-zh
?? Processing-time temporal join is not supported yet
hi??kakatemporal join??
org.apache.flink.table.api.TableException: Processing-time temporal join is not 
supported yet.
 
sql??
 
 
create view visioned_table as  
  select
   user_id,
   event
 from
(select
    user_id,
    event,
    row_number() over(partition by user_id order by event_time desc) 
as rn
    from kafka_table1
    )ta where rn=1;
select
      t1.*,t2.*
    from mvp_rtdwd_event_app_quit t1
 join visioned_table FOR SYSTEM_TIME AS OF t1.proc_time AS t2
   on t1.user_id=t2.user_id 
   where t1.user_id is not null


Re: flink1.12.2 sql session窗口间隔时间段内没有新数据窗口不关闭

2021-06-17 文章 lpengdr...@163.com
你用EventTimeSession窗口的超时也是按照你的事件时间来判断的,要有超过超时时间边界的数据输入了才能触发



lpengdr...@163.com
 
发件人: raofang
发送时间: 2021-06-18 12:20
收件人: user-zh
主题: flink1.12.2 sql session窗口间隔时间段内没有新数据窗口不关闭
hi,请教大家一个问题:
flink1.12.2 sql BlinkPlanner
使用基于routime的session窗口时,在设置的间隔时间10分钟内没有接收到新数据,窗口没有关闭输出计算结果;但是接收到10分钟之后的新数据时上一次的session窗口才关闭输出结果。不知道是什么原因导致超过间隔时间没有新数据窗口没有关闭的问题呢?
 谢谢~
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: Flink SQL 1.11.3问题请教

2021-06-02 文章 yinghua...@163.com
我这个情况还有点不一样的,本来单条数据是如下的:一条数据对应一个offset
 {"name":"test1"}
但是Nifi采集数据后,写入kafka格式是下面这样的,一个offset对应下面几条数据(每一个offset对应的真实数据条数还不是固定的)
 {"name":"test1"}
 {"name":"test2"}
 {"name":"test3"}
...

感谢你的回复,我借鉴下看怎么处理下,多谢了!




yinghua...@163.com
 
发件人: WeiXubin
发送时间: 2021-06-02 17:44
收件人: user-zh
主题: Re: Flink SQL 1.11.3问题请教
不知道下面场景是否与你描述的场景相同 ,假设采集到单条json消息格式为 {"name":"test"},将多条json消息合并为一条的格式为 
[{"name":"test"},{"name":"test2"}]。 我的 Flink 任务是采用 FlinkSQL
编写,处理这种情况我的方式是通过编写一个 UDF (TableFunction), 之后在这个 UDF 中进行数据拆解,变为多行 (row),再输出到
sink。
 
Row row = new Row(arity);
collect(row);
 
具体使用可参考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/
 
Best,Weixubin
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink SQL 1.11.3问题请教

2021-05-31 文章 yinghua...@163.com
我们使用Nifi将数据采集到kafka,然后使用Flink处理kafka中的数据,现在Nifi如果将多条数据当做一条记录(写入kafka中的数据格式为csv或者json)写入kafka后(就是一个offset中包含了多条数据),Flink只会处理其中的一条数据?有没有什么办法让Flink都处理一个offset中的多条数据?



yinghua...@163.com


退订

2021-05-19 文章 zander0...@163.com
退订



周德虎
 
电话:15021351770
邮箱:zander0...@163.com



回复: Re: 扩展SqlServerDialect 运行在flink on k8s报错

2021-05-07 文章 18756225...@163.com
非常感谢!


 
发件人: Leonard Xu
发送时间: 2021-05-07 14:26
收件人: user-zh
主题: Re: 扩展SqlServerDialect 运行在flink on k8s报错
Hi
 
看日志是加载不到对应的class文件,(1)可以对比下你jar包里的路径是不是不正确,(2) 检查下集群上是不是还有之前的jar包,没替换干净
 
祝好
Leonard
 
> 在 2021年5月7日,13:58,18756225...@163.com 写道:
> 
> 大家好,遇到一个问题:
> 坏境:flink 版本1.12.1,  k8s集群为session模式,  该集群之前可以将数据正常写入到mysql
> 参考mysqlDialect 扩展了一个 
> SqlServerDialect,替换了flink的lib包下的flink-connector-jdbc_2.11-1.12.1.jar,在on 
> yarn时 任务正常运行,flink-sql也可以将数据写入到sqlserver
> 在同一台机器坏境 提交到k8s搭建的flink session模式集群, 就报如下错误 , JdbcBatchingOutputFormat 
> 这个类加载不到? 
> 
> 谢谢!
> 
> 完整异常如下:
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
>at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>at java.lang.reflect.Method.invoke(Method.java:498)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: 
> Cannot load user class: 
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat
> ClassLoader info: URL ClassLoader:
> Class not resolvable through given classloader.
>at 
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
>at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
>at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
>at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
>at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:574)
>at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
>at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:164)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.i

扩展SqlServerDialect 运行在flink on k8s报错

2021-05-06 文章 18756225...@163.com
大家好,遇到一个问题:
 坏境:flink 版本1.12.1,  k8s集群为session模式,  该集群之前可以将数据正常写入到mysql
 参考mysqlDialect 扩展了一个 
SqlServerDialect,替换了flink的lib包下的flink-connector-jdbc_2.11-1.12.1.jar,在on yarn时 
任务正常运行,flink-sql也可以将数据写入到sqlserver
 在同一台机器坏境 提交到k8s搭建的flink session模式集群, 就报如下错误 , JdbcBatchingOutputFormat 
这个类加载不到? 

谢谢!

完整异常如下:
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
load user class: 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:574)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:164)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstCl

Flink SQL问题请教:Flink SQL中支持在一个TableEnvionment中多个DML语句提交时共用一个Yarn任务来运行吗?

2021-04-30 文章 yinghua...@163.com




yinghua...@163.com


flink Kafka producer执行出错

2021-04-30 文章 tanggen...@163.com
我在flink处理消费kafka的一个topic,将迟到数据通过侧流发送到一个新的topic,基本上是一直报这个错,每次提交checkpoint时都会报这个错,然后就会重启
还请指导一下,需要做些其它的设置吗
2021-04-30 17:00:51
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send 
data to Kafka: Producer attempted an operation with an old epoch. Either there 
is a newer producer with the same transactionalId, or the producer's 
transaction has been expired by the broker.
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1282)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:816)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:703)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:794)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:58)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.sideOutput(WindowOperator.java:558)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:422)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:185)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
attempted an operation with an old epoch. Either there is a newer producer with 
the same transactionalId, or the producer's transaction has been expired by the 
broker.




tanggen...@163.com


Re: 解析kafka 非标准JSON问题

2021-04-27 文章 19971028...@163.com
可以尝试在kafka 写个拦截器处理json



19971028...@163.com
 
发件人: guoyb
发送时间: 2021-04-27 17:55
收件人: user-zh
主题: 解析kafka 非标准JSON问题
您好!请问遇到如下格式的该如何用SQL解析。
string {name=string} {id : 1, name : abram}
eg:
2021-04-03x {name=aa} {id : 1, name : abram}
最重要为第三个字段 JSON,里面包含了data
第一和第二个字段可要可不要
 
 
请问,有思路可以参考吗?或者demo


flink在yarn集群上启动的问题

2021-04-21 文章 tanggen...@163.com
:
 Start request for container_1618931441017_0004_03_01 by user root
2021-04-20 23:34:12,570 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 TimelineService V2.0 is not enabled. Skipping updating flowContext for 
application application_1618931441017_0004
2021-04-20 23:34:12,571 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl:
 Adding container_1618931441017_0004_03_01 to application 
application_1618931441017_0004
2021-04-20 23:34:12,572 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl:
 Container container_1618931441017_0004_03_01 transitioned from NEW to 
LOCALIZING
2021-04-20 23:34:12,572 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Got 
event CONTAINER_INIT for appId application_1618931441017_0004
2021-04-20 23:34:12,574 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl:
 Container container_1618931441017_0004_03_01 transitioned from LOCALIZING 
to SCHEDULED
2021-04-20 23:34:12,574 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler:
 Starting container [container_1618931441017_0004_03_01]
2021-04-20 23:34:12,600 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl:
 Container container_1618931441017_0004_03_01 transitioned from SCHEDULED 
to RUNNING
2021-04-20 23:34:12,600 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Starting resource-monitoring for container_1618931441017_0004_03_01
2021-04-20 23:34:12,603 INFO 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: 
launchContainer: [bash, 
/opt/hadoop/hadoopdata/nm-local-dir/usercache/root/appcache/application_1618931441017_0004/container_1618931441017_0004_03_01/default_container_executor.sh]
2021-04-20 23:34:12,905 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 container_1618931441017_0004_03_01's ip = 10.100.8.108, and hostname = 
node108
2021-04-20 23:34:12,911 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Skipping monitoring container container_1618931441017_0004_03_01 since CPU 
usage is not yet available.
2021-04-20 23:34:16,067 WARN 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code 
from container container_1618931441017_0004_03_01 is : 1
2021-04-20 23:34:16,067 WARN 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exception 
from container-launch with container ID: container_1618931441017_0004_03_01 
and exit code: 1
ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:1009)
at org.apache.hadoop.util.Shell.run(Shell.java:902)
at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1227)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:294)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.launchContainer(ContainerLaunch.java:501)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:311)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:106)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2021-04-20 23:34:16,067 INFO 
org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: Exception from 
container-launch.
2021-04-20 23:34:16,067 INFO 
org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: Container id: 
container_1618931441017_0004_03_01
2021-04-20 23:34:16,067 INFO 
org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: Exit code: 1
2021-04-20 23:34:16,067 WARN 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
 Container launch failed : Container exited with a non-zero exit code 1.
2021-04-20 23:34:16,069 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl:
 Container container_1618931441017_0004_03_01 transitioned from RUNNING to 
EXITED_WITH_FAILURE
2021-04-20 23:34:16,069 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
 Cleaning up container container_1618931441017_0004_03_01



tanggen...@163.com


Application application_1618931441017_0004

2021-04-20 文章 tanggen...@163.com
:
 Start request for container_1618931441017_0004_03_01 by user root
2021-04-20 23:34:12,570 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
 TimelineService V2.0 is not enabled. Skipping updating flowContext for 
application application_1618931441017_0004
2021-04-20 23:34:12,571 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl:
 Adding container_1618931441017_0004_03_01 to application 
application_1618931441017_0004
2021-04-20 23:34:12,572 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl:
 Container container_1618931441017_0004_03_01 transitioned from NEW to 
LOCALIZING
2021-04-20 23:34:12,572 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Got 
event CONTAINER_INIT for appId application_1618931441017_0004
2021-04-20 23:34:12,574 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl:
 Container container_1618931441017_0004_03_01 transitioned from LOCALIZING 
to SCHEDULED
2021-04-20 23:34:12,574 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler:
 Starting container [container_1618931441017_0004_03_01]
2021-04-20 23:34:12,600 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl:
 Container container_1618931441017_0004_03_01 transitioned from SCHEDULED 
to RUNNING
2021-04-20 23:34:12,600 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Starting resource-monitoring for container_1618931441017_0004_03_01
2021-04-20 23:34:12,603 INFO 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: 
launchContainer: [bash, 
/opt/hadoop/hadoopdata/nm-local-dir/usercache/root/appcache/application_1618931441017_0004/container_1618931441017_0004_03_01/default_container_executor.sh]
2021-04-20 23:34:12,905 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 container_1618931441017_0004_03_01's ip = 10.100.8.108, and hostname = 
node108
2021-04-20 23:34:12,911 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Skipping monitoring container container_1618931441017_0004_03_01 since CPU 
usage is not yet available.
2021-04-20 23:34:16,067 WARN 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code 
from container container_1618931441017_0004_03_01 is : 1
2021-04-20 23:34:16,067 WARN 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exception 
from container-launch with container ID: container_1618931441017_0004_03_01 
and exit code: 1
ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:1009)
at org.apache.hadoop.util.Shell.run(Shell.java:902)
at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1227)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:294)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.launchContainer(ContainerLaunch.java:501)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:311)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:106)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2021-04-20 23:34:16,067 INFO 
org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: Exception from 
container-launch.
2021-04-20 23:34:16,067 INFO 
org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: Container id: 
container_1618931441017_0004_03_01
2021-04-20 23:34:16,067 INFO 
org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: Exit code: 1
2021-04-20 23:34:16,067 WARN 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
 Container launch failed : Container exited with a non-zero exit code 1.
2021-04-20 23:34:16,069 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl:
 Container container_1618931441017_0004_03_01 transitioned from RUNNING to 
EXITED_WITH_FAILURE
2021-04-20 23:34:16,069 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
 Cleaning up container container_1618931441017_0004_03_01



tanggen...@163.com


flink在k8s上部署,如何修改默认的lib目录

2021-04-18 文章 cxydeve...@163.com
默认的lib路径是/opt/flink/lib
现在我无法操作/opt/flink/lib, 但是又想往里面放东西,所以想请教是否可以修改在flink-conf.yaml指定lib路径?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

退订

2021-04-07 文章 huangqibing...@163.com
退订



huangqibing...@163.com
 
发件人: Chen Virtual
发送时间: 2021-04-08 12:19
收件人: user-zh@flink.apache.org
主题: 退订
退订 


退订

2021-03-31 文章 zhaorui_9...@163.com
退订


zhaorui_9...@163.com


退订

2021-03-25 文章 aegean0...@163.com


退订

| |
aegean0933
邮箱:aegean0...@163.com
|

退订

2021-03-24 文章 aegean0...@163.com
退订

回复: flink 1.12.0 k8s session部署异常

2021-03-24 文章 18756225...@163.com
我也遇到这个问题,集群可以提交正常提交任务,但是jobmanager的日志一直有这个, 请问可有办法解决?

 
发件人: casel.chen
发送时间: 2021-02-07 16:33
收件人: user-zh@flink.apache.org
主题: flink 1.12.0 k8s session部署异常
在k8s上部署sesson模式的flink集群遇到jobmanager报如下错误,请问这是什么原因造成的?要如何fix?
 
 
2021-02-07 08:21:41,873 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
   [] - Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at 
akka://flink/user/rpc/dispatcher_1 .
2021-02-07 08:21:43,506 WARN  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint   [] - Unhandled 
exception
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_275]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) 
~[?:1.8.0_275]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) 
~[?:1.8.0_275]
at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) 
~[?:1.8.0_275]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
2021-02-07 08:21:43,940 WARN  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint   [] - Unhandled 
exception
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_275]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) 
~[?:1.8.0_275]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) 
~[?:1.8.0_275]
at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) 
~[?:1.8.0_275]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]


回复: flink 1.11.2 使用rocksdb时出现org.apache.flink.util.SerializedThrowable错误

2021-03-20 文章 hdxg1101300...@163.com
知道原因了



hdxg1101300...@163.com
 
发件人: hdxg1101300...@163.com
发送时间: 2021-03-20 22:07
收件人: user-zh
主题: flink 1.11.2 使用rocksdb时出现org.apache.flink.util.SerializedThrowable错误
你好:
最近升级flink版本从flink 1.10.2 升级到flink.1.11.2;主要是考虑日志太大查看不方便的原因;
代码没有变动只是从1.10.2.编译为1.11.2 ,集群客户端版本升级到1.11.2;任务提交到yarn 使用per job方式;
之前时一个taskmanager一个slot,现在使用一个taskmanager 2个slot;程序运行一段时间(1个小时左右)后就会出现
Caused by: org.apache.flink.util.SerializedThrowable
 
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
snapshot 53 for operator Sink: 发送短信 (5/8). Failure reason: Checkpoint was 
declined.
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
 [flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
 [flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
 [flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
[flink-dist_2.11-1.11.2.jar:1.11.2]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
Caused by: org.apache.flink.util.SerializedThrowable
at com.com.functions.Transaction.firstPhase(Transaction.java:193) 
~[dc_cbssbroadband-1.0.4.3.1-jar-with-dependencies.jar:?]
at com.com.functions.TransactionData.flush(TransactionData.java:37) 
~[dc_cbssbroadband-1.0.4.3.1-jar-with-dependencies.jar:?]
at com.com.utils.TwoPhaseHttpSink.preCommit(TwoPhaseHttpSink.java:105) 
~[dc_cbssbroadband-1.0.4.3.1-jar-with-dependencies.jar:?]
at com.com.utils.TwoPhaseHttpSink.preCommit(TwoPhaseHttpSink.java:39) 
~[dc_cbssbroadband-1.0.4.3.1-jar-with-dependencies.jar:?]
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:321)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at

flink 1.11.2 使用rocksdb时出现org.apache.flink.util.SerializedThrowable错误

2021-03-20 文章 hdxg1101300...@163.com
 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]



hdxg1101300...@163.com


使用codehaus.janino动态生成类,在map函数中自动映射json对象,找不到我自动生成的类

2021-03-16 文章 hdxg1101300...@163.com
你好:
我这边根据数据字典 动态生产类然后通过map函数对我的json字符串映射到我动态生成的类中;
public static Class getClazz(String className,String cls) throws 
Exception {
SimpleCompiler compiler = new SimpleCompiler();

compiler.cook(cls);

compiler.setParentClassLoader(Thread.currentThread().getContextClassLoader());

return compiler.getClassLoader().loadClass("Dc" + className);
}

Class clazz = (Class) ClassUtil.getClazz(upClassName, 
s);
map函数中JSONObject.parseObject(value.toString(), clazz);进行转换
程序执行会报找不到我动态生成的类
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load 
user class: DcItcast_accreds
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:272)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:155)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: DcItcast_accreds
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1807)
at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1770)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1595)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2343)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2267)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2125)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260)



hdxg1101300...@163.com


Re: flink 1.12 使用流式写入hive报错,Failed to create Hive RecordWriter

2021-03-15 文章 yinghua...@163.com
Caused by: java.lang.OutOfMemoryError: Java heap space



yinghua...@163.com
 
发件人: william
发送时间: 2021-03-15 16:32
收件人: user-zh
主题: flink 1.12 使用流式写入hive报错,Failed to create Hive RecordWriter
flink 1.12
hadoop 2.7.5
hive 2.3.6
 
报错内容:
2021-03-15 16:29:43
org.apache.flink.connectors.hive.FlinkHiveException:
org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create
Hive RecordWriter
at
org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:165)
at
org.apache.flink.connectors.hive.write.HiveBulkWriterFactory.create(HiveBulkWriterFactory.java:48)
at
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:263)
at
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:235)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:243)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:220)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:305)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:103)
at
org.apache.flink.table.filesystem.stream.AbstractStreamingWriter.processElement(AbstractStreamingWriter.java:140)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at StreamExecCalc$36.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
Caused by: org.apache.flink.table.catalog.exceptions.CatalogException:
Failed to create Hive RecordWriter
at
org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:77)
at
org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:157)
... 34 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:67)
... 35 more
Caused by: java.lang.OutOfMemoryError: Java heap space
 
 
 
--
Sent

  1   2   3   4   >