java.lang.OutOfMemory:null

2024-07-07 文章 冯路路
Hi

Flink任务平稳运行一段时间,资源和数据都很平稳的情况下,一段时间后,忽然在解析json对象时报java.lang.OutOfMemory:null,然后cpu和内存就直线上升,直到完全将资源耗尽,报java.lang.OutOfMemory:java
 heap 
space,增加资源后,过一段时候会有同样的问题出现,这是什么原因,如果是内存泄漏,为什么会CPU和内存都完全平稳的运行一段时间,不应该是全程有直线上升的现象吗




Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.9.0 released

2024-07-03 文章 Őrhidi Mátyás
Thank you, Gyula! 拾
Cheers
On Wed, Jul 3, 2024 at 8:00 AM Gyula Fóra  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink Kubernetes Operator 1.9.0.
>
> The Flink Kubernetes Operator allows users to manage their Apache Flink
> applications and their lifecycle through native k8s tooling like kubectl.
>
> Release blogpost:
> https://flink.apache.org/2024/07/02/apache-flink-kubernetes-operator-1.9.0-release-announcement/
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Flink Kubernetes Operator can be found at:
> https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator
>
> Official Docker image for Flink Kubernetes Operator can be found at:
> https://hub.docker.com/r/apache/flink-kubernetes-operator
>
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354417
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> Gyula Fora
>


[ANNOUNCE] Apache Flink Kubernetes Operator 1.9.0 released

2024-07-03 文章 Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.9.0.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.

Release blogpost:
https://flink.apache.org/2024/07/02/apache-flink-kubernetes-operator-1.9.0-release-announcement/

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator can be found at:
https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354417

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


回复: Re:回复: Re:flink1.18 on yarn提交任务生成多个application

2024-07-02 文章 Liu Join
谢谢!!!


祝好运,
Liu

发件人: Xuyang 
发送时间: 2024年7月2日 14:00
收件人: user-zh@flink.apache.org 
主题: Re:回复: Re:flink1.18 on yarn提交任务生成多个application

可以参考下这[1]




Tips: 社区新语法EXECUTESTATEMENTSET BEGIN ... END; ,也可以用 begin statement set; ... 
end;




[1]https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/insert/#insert-into-multiple-tables







--

Best!
Xuyang





在 2024-07-02 11:42:32,"Liu Join"  写道:
>你好,感谢回复,请问有官网文档么,我想详细了解下,谢谢!
>
>发件人: Xuyang 
>发送时间: 2024年7月2日 11:25
>收件人: user-zh@flink.apache.org 
>主题: Re:flink1.18 on yarn提交任务生成多个application
>
>Hi, 如果是不想出现两个application的情况,可以试一下使用statement 
>set将两个dml放在一起。否则的话,会被视为两个单独的操作,分成两个application。
>
>
>
>
>sql: begin statement set; ...  end;
>
>java & scala table api: tableEnv#createStatementSet
>
>
>
>
>--
>
>Best!
>Xuyang
>
>
>
>
>
>在 2024-07-02 10:04:34,"Liu Join"  写道:
>>版本:flink1.18、hadoop3.0.0
>>提交方式:per-job
>>
>>问题:
>>1. 使用flink sql编写的程序,数据源相同,输出定义了一个doris 
>>sink,以及一个表转流的print,将jar包提交到yarn会生成两个application,一个输出为doris 
>>sink,另一个输出为流转表的print
>>
>>2.  使用flink sql编写的程序,数据源相同,输出定义了两个doris 
>>sink,表a和表b,将jar包提交到yarn会生成两个application,一个输出为doris sink表a,另一个输出为doris sink表b
>>
>>
>>请问这是什么原因


Re:回复: Re:flink1.18 on yarn提交任务生成多个application

2024-07-02 文章 Xuyang
可以参考下这[1]




Tips: 社区新语法EXECUTESTATEMENTSET BEGIN ... END; ,也可以用 begin statement set; ... 
end;




[1]https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/insert/#insert-into-multiple-tables







--

Best!
Xuyang





在 2024-07-02 11:42:32,"Liu Join"  写道:
>你好,感谢回复,请问有官网文档么,我想详细了解下,谢谢!
>
>发件人: Xuyang 
>发送时间: 2024年7月2日 11:25
>收件人: user-zh@flink.apache.org 
>主题: Re:flink1.18 on yarn提交任务生成多个application
>
>Hi, 如果是不想出现两个application的情况,可以试一下使用statement 
>set将两个dml放在一起。否则的话,会被视为两个单独的操作,分成两个application。
>
>
>
>
>sql: begin statement set; ...  end;
>
>java & scala table api: tableEnv#createStatementSet
>
>
>
>
>--
>
>Best!
>Xuyang
>
>
>
>
>
>在 2024-07-02 10:04:34,"Liu Join"  写道:
>>版本:flink1.18、hadoop3.0.0
>>提交方式:per-job
>>
>>问题:
>>1. 使用flink sql编写的程序,数据源相同,输出定义了一个doris 
>>sink,以及一个表转流的print,将jar包提交到yarn会生成两个application,一个输出为doris 
>>sink,另一个输出为流转表的print
>>
>>2.  使用flink sql编写的程序,数据源相同,输出定义了两个doris 
>>sink,表a和表b,将jar包提交到yarn会生成两个application,一个输出为doris sink表a,另一个输出为doris sink表b
>>
>>
>>请问这是什么原因


回复: Re:flink1.18 on yarn提交任务生成多个application

2024-07-01 文章 Liu Join
你好,感谢回复,请问有官网文档么,我想详细了解下,谢谢!

发件人: Xuyang 
发送时间: 2024年7月2日 11:25
收件人: user-zh@flink.apache.org 
主题: Re:flink1.18 on yarn提交任务生成多个application

Hi, 如果是不想出现两个application的情况,可以试一下使用statement 
set将两个dml放在一起。否则的话,会被视为两个单独的操作,分成两个application。




sql: begin statement set; ...  end;

java & scala table api: tableEnv#createStatementSet




--

Best!
Xuyang





在 2024-07-02 10:04:34,"Liu Join"  写道:
>版本:flink1.18、hadoop3.0.0
>提交方式:per-job
>
>问题:
>1. 使用flink sql编写的程序,数据源相同,输出定义了一个doris 
>sink,以及一个表转流的print,将jar包提交到yarn会生成两个application,一个输出为doris sink,另一个输出为流转表的print
>
>2.  使用flink sql编写的程序,数据源相同,输出定义了两个doris 
>sink,表a和表b,将jar包提交到yarn会生成两个application,一个输出为doris sink表a,另一个输出为doris sink表b
>
>
>请问这是什么原因


Re:flink1.18 on yarn提交任务生成多个application

2024-07-01 文章 Xuyang
Hi, 如果是不想出现两个application的情况,可以试一下使用statement 
set将两个dml放在一起。否则的话,会被视为两个单独的操作,分成两个application。




sql: begin statement set; ...  end;

java & scala table api: tableEnv#createStatementSet




--

Best!
Xuyang





在 2024-07-02 10:04:34,"Liu Join"  写道:
>版本:flink1.18、hadoop3.0.0
>提交方式:per-job
>
>问题:
>1. 使用flink sql编写的程序,数据源相同,输出定义了一个doris 
>sink,以及一个表转流的print,将jar包提交到yarn会生成两个application,一个输出为doris sink,另一个输出为流转表的print
>
>2.  使用flink sql编写的程序,数据源相同,输出定义了两个doris 
>sink,表a和表b,将jar包提交到yarn会生成两个application,一个输出为doris sink表a,另一个输出为doris sink表b
>
>
>请问这是什么原因


Re: 这绝对算是bug

2024-07-01 文章 Yan Zhou
退订

On Mon, Jul 1, 2024 at 10:22 PM wjw_bigd...@163.com 
wrote:

> 退订
>
>
>
>  回复的原邮件 
> | 发件人 | Cuixb |
> | 日期 | 2024年07月01日 22:16 |
> | 收件人 | user-zh@flink.apache.org |
> | 抄送至 | |
> | 主题 | Re: 这绝对算是bug |
> GC不能说长,也绝对不短,大概计算了一下,24G内存,得有10秒左右无响应,多数在10秒内
> 发自我的 iPhone
>
> > 在 2024年7月1日,17:20,rui chen  写道:
> >
> > 建议检查一下JM的GC情况。
> >
> > wjw_bigd...@163.com  于2024年7月1日周一 17:18写道:
> >
> >> 退订
> >>
> >>
> >>
> >>  回复的原邮件 
> >> | 发件人 | wjw_bigd...@163.com |
> >> | 日期 | 2024年07月01日 17:13 |
> >> | 收件人 | user-zh |
> >> | 抄送至 | |
> >> | 主题 | 回复:这绝对算是bug |
> >> 退订
> >>
> >>
> >>
> >>  回复的原邮件 
> >> | 发件人 | 星海<2278179...@qq.com.INVALID> |
> >> | 日期 | 2024年06月29日 21:31 |
> >> | 收件人 | user-zh |
> >> | 抄送至 | |
> >> | 主题 | 回复: 这绝对算是bug |
> >> 退订
> >>
> >>
> >> --原始邮件--
> >> 发件人:
> >>  "user-zh"
> >><
> >> cfso3...@126.com;退订
> >> 发送时间:2024年6月29日(星期六) 晚上8:24
> >> 收件人:"user-zh" >>
> >> 主题:Re: 这绝对算是bug
> >>
> >>
> >>
> >> 连接没问题,主要是tm一直在处理写入流,我也看了一下负载,其实不高,但就是不相应,导致报timeout,然后就是最开始那个错误!
> >> 发自我的 iPhone
> >>
> >>  在 2024年6月29日,16:49,Zhanghao Chen  写道:
> >> 
> >>  Hi,从报错看是 JM 丢主了,导致 TM 上 task 全部关停。看下 JM 侧是不是 HA 连接有问题呢?
> >> 
> >>  Best,
> >>  Zhanghao Chen
> >>  
> >>  From: Cuixb  >>  Sent: Saturday, June 29, 2024 10:31
> >>  To: user-zh@flink.apache.org  >>  Subject: 这绝对算是bug
> >> 
> >>  生产环境Flink 1.16.2
> >> 
> >>  2024-06-29 09:17:23
> >>  java.lang.Exception: Job leader for job id
> >> 8ccdd299194a686e3ecda602c3c75bf3 lost leadership.
> >>  at
> >>
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:2310)
> >>  at
> >> java.util.Optional.ifPresent(Optional.java:159)
> >>  at
> >>
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:2308)
> >>  at
> >>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
> >>  at
> >>
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> >>  at
> >>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
> >>  at
> >>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
> >>  at
> >>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
> >>  at akka.japi.pf
> >> .UnitCaseStatement.apply(CaseStatements.scala:24)
> >>  at akka.japi.pf
> >> .UnitCaseStatement.apply(CaseStatements.scala:20)
> >>  at
> >> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> >>  at
> >> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> >>  at akka.japi.pf
> >> .UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> >>  at
> >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> >>  at
> >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> >>  at
> >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> >>  at
> >> akka.actor.Actor.aroundReceive(Actor.scala:537)
> >>  at
> >> akka.actor.Actor.aroundReceive$(Actor.scala:535)
> >>  at
> >> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
> >>  at
> >> akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
> >>  at
> >> akka.actor.ActorCell.invoke(ActorCell.scala:548)
> >>  at
> >> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
> >>  at
> >> akka.dispatch.Mailbox.run(Mailbox.scala:231)
> >>  at
> >> akka.dispatch.Mailbox.exec(Mailbox.scala:243)
> >>  at
> >> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> >>  at
> >> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(For
> >> 
> >>  发自我的 iPhone
> >


flink1.18 on yarn提交任务生成多个application

2024-07-01 文章 Liu Join
版本:flink1.18、hadoop3.0.0
提交方式:per-job

问题:
1. 使用flink sql编写的程序,数据源相同,输出定义了一个doris 
sink,以及一个表转流的print,将jar包提交到yarn会生成两个application,一个输出为doris sink,另一个输出为流转表的print

2.  使用flink sql编写的程序,数据源相同,输出定义了两个doris 
sink,表a和表b,将jar包提交到yarn会生成两个application,一个输出为doris sink表a,另一个输出为doris sink表b


请问这是什么原因


??????????

2024-07-01 文章 wjw_bigd...@163.com




  
| ?? | wjw_bigd...@163.com |
|  | 2024??07??01?? 17:17 |
| ?? | user-zh |
| ?? | |
|  | ?? |




  
| ?? | wjw_bigd...@163.com |
|  | 2024??07??01?? 17:12 |
| ?? | user-zh |
| ?? | |
|  | ?? |




  
| ?? | zhanggongx |
|  | 2024??07??01?? 16:52 |
| ?? | user-zh |
| ?? | |
|  |  |


回复:这绝对算是bug

2024-07-01 文章 wjw_bigd...@163.com
退订



 回复的原邮件 
| 发件人 | Cuixb |
| 日期 | 2024年07月01日 22:16 |
| 收件人 | user-zh@flink.apache.org |
| 抄送至 | |
| 主题 | Re: 这绝对算是bug |
GC不能说长,也绝对不短,大概计算了一下,24G内存,得有10秒左右无响应,多数在10秒内
发自我的 iPhone

> 在 2024年7月1日,17:20,rui chen  写道:
>
> 建议检查一下JM的GC情况。
>
> wjw_bigd...@163.com  于2024年7月1日周一 17:18写道:
>
>> 退订
>>
>>
>>
>>  回复的原邮件 
>> | 发件人 | wjw_bigd...@163.com |
>> | 日期 | 2024年07月01日 17:13 |
>> | 收件人 | user-zh |
>> | 抄送至 | |
>> | 主题 | 回复:这绝对算是bug |
>> 退订
>>
>>
>>
>>  回复的原邮件 
>> | 发件人 | 星海<2278179...@qq.com.INVALID> |
>> | 日期 | 2024年06月29日 21:31 |
>> | 收件人 | user-zh |
>> | 抄送至 | |
>> | 主题 | 回复: 这绝对算是bug |
>> 退订
>>
>>
>> --原始邮件--
>> 发件人:
>>  "user-zh"
>><
>> cfso3...@126.com;退订
>> 发送时间:2024年6月29日(星期六) 晚上8:24
>> 收件人:"user-zh">
>> 主题:Re: 这绝对算是bug
>>
>>
>>
>> 连接没问题,主要是tm一直在处理写入流,我也看了一下负载,其实不高,但就是不相应,导致报timeout,然后就是最开始那个错误!
>> 发自我的 iPhone
>>
>>  在 2024年6月29日,16:49,Zhanghao Chen > 
>>  Hi,从报错看是 JM 丢主了,导致 TM 上 task 全部关停。看下 JM 侧是不是 HA 连接有问题呢?
>> 
>>  Best,
>>  Zhanghao Chen
>>  
>>  From: Cuixb >  Sent: Saturday, June 29, 2024 10:31
>>  To: user-zh@flink.apache.org >  Subject: 这绝对算是bug
>> 
>>  生产环境Flink 1.16.2
>> 
>>  2024-06-29 09:17:23
>>  java.lang.Exception: Job leader for job id
>> 8ccdd299194a686e3ecda602c3c75bf3 lost leadership.
>>  at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:2310)
>>  at
>> java.util.Optional.ifPresent(Optional.java:159)
>>  at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:2308)
>>  at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
>>  at
>> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>>  at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
>>  at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
>>  at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
>>  at akka.japi.pf
>> .UnitCaseStatement.apply(CaseStatements.scala:24)
>>  at akka.japi.pf
>> .UnitCaseStatement.apply(CaseStatements.scala:20)
>>  at
>> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>  at
>> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>  at akka.japi.pf
>> .UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>>  at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>  at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>  at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>  at
>> akka.actor.Actor.aroundReceive(Actor.scala:537)
>>  at
>> akka.actor.Actor.aroundReceive$(Actor.scala:535)
>>  at
>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>>  at
>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>>  at
>> akka.actor.ActorCell.invoke(ActorCell.scala:548)
>>  at
>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>>  at
>> akka.dispatch.Mailbox.run(Mailbox.scala:231)
>>  at
>> akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>>  at
>> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>>  at
>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(For
>> 
>>  发自我的 iPhone
>


Re: 这绝对算是bug

2024-07-01 文章 Cuixb
GC不能说长,也绝对不短,大概计算了一下,24G内存,得有10秒左右无响应,多数在10秒内
发自我的 iPhone

> 在 2024年7月1日,17:20,rui chen  写道:
> 
> 建议检查一下JM的GC情况。
> 
> wjw_bigd...@163.com  于2024年7月1日周一 17:18写道:
> 
>> 退订
>> 
>> 
>> 
>>  回复的原邮件 
>> | 发件人 | wjw_bigd...@163.com |
>> | 日期 | 2024年07月01日 17:13 |
>> | 收件人 | user-zh |
>> | 抄送至 | |
>> | 主题 | 回复:这绝对算是bug |
>> 退订
>> 
>> 
>> 
>>  回复的原邮件 
>> | 发件人 | 星海<2278179...@qq.com.INVALID> |
>> | 日期 | 2024年06月29日 21:31 |
>> | 收件人 | user-zh |
>> | 抄送至 | |
>> | 主题 | 回复: 这绝对算是bug |
>> 退订
>> 
>> 
>> --原始邮件--
>> 发件人:
>>  "user-zh"
>><
>> cfso3...@126.com;退订
>> 发送时间:2024年6月29日(星期六) 晚上8:24
>> 收件人:"user-zh"> 
>> 主题:Re: 这绝对算是bug
>> 
>> 
>> 
>> 连接没问题,主要是tm一直在处理写入流,我也看了一下负载,其实不高,但就是不相应,导致报timeout,然后就是最开始那个错误!
>> 发自我的 iPhone
>> 
>>  在 2024年6月29日,16:49,Zhanghao Chen > 
>>  Hi,从报错看是 JM 丢主了,导致 TM 上 task 全部关停。看下 JM 侧是不是 HA 连接有问题呢?
>> 
>>  Best,
>>  Zhanghao Chen
>>  
>>  From: Cuixb >  Sent: Saturday, June 29, 2024 10:31
>>  To: user-zh@flink.apache.org >  Subject: 这绝对算是bug
>> 
>>  生产环境Flink 1.16.2
>> 
>>  2024-06-29 09:17:23
>>  java.lang.Exception: Job leader for job id
>> 8ccdd299194a686e3ecda602c3c75bf3 lost leadership.
>>  at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:2310)
>>  at
>> java.util.Optional.ifPresent(Optional.java:159)
>>  at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:2308)
>>  at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
>>  at
>> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>>  at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
>>  at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
>>  at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
>>  at akka.japi.pf
>> .UnitCaseStatement.apply(CaseStatements.scala:24)
>>  at akka.japi.pf
>> .UnitCaseStatement.apply(CaseStatements.scala:20)
>>  at
>> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>  at
>> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>  at akka.japi.pf
>> .UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>>  at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>  at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>  at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>  at
>> akka.actor.Actor.aroundReceive(Actor.scala:537)
>>  at
>> akka.actor.Actor.aroundReceive$(Actor.scala:535)
>>  at
>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>>  at
>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>>  at
>> akka.actor.ActorCell.invoke(ActorCell.scala:548)
>>  at
>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>>  at
>> akka.dispatch.Mailbox.run(Mailbox.scala:231)
>>  at
>> akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>>  at
>> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>>  at
>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(For
>> 
>>  发自我的 iPhone
> 



Re: 这绝对算是bug

2024-07-01 文章 rui chen
建议检查一下JM的GC情况。

wjw_bigd...@163.com  于2024年7月1日周一 17:18写道:

> 退订
>
>
>
>  回复的原邮件 
> | 发件人 | wjw_bigd...@163.com |
> | 日期 | 2024年07月01日 17:13 |
> | 收件人 | user-zh |
> | 抄送至 | |
> | 主题 | 回复:这绝对算是bug |
> 退订
>
>
>
>  回复的原邮件 
> | 发件人 | 星海<2278179...@qq.com.INVALID> |
> | 日期 | 2024年06月29日 21:31 |
> | 收件人 | user-zh |
> | 抄送至 | |
> | 主题 | 回复: 这绝对算是bug |
> 退订
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> cfso3...@126.com;退订
> 发送时间:2024年6月29日(星期六) 晚上8:24
> 收件人:"user-zh"
> 主题:Re: 这绝对算是bug
>
>
>
> 连接没问题,主要是tm一直在处理写入流,我也看了一下负载,其实不高,但就是不相应,导致报timeout,然后就是最开始那个错误!
> 发自我的 iPhone
>
>  在 2024年6月29日,16:49,Zhanghao Chen  
>  Hi,从报错看是 JM 丢主了,导致 TM 上 task 全部关停。看下 JM 侧是不是 HA 连接有问题呢?
> 
>  Best,
>  Zhanghao Chen
>  
>  From: Cuixb   Sent: Saturday, June 29, 2024 10:31
>  To: user-zh@flink.apache.org   Subject: 这绝对算是bug
> 
>  生产环境Flink 1.16.2
> 
>  2024-06-29 09:17:23
>  java.lang.Exception: Job leader for job id
> 8ccdd299194a686e3ecda602c3c75bf3 lost leadership.
>  at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:2310)
>  at
> java.util.Optional.ifPresent(Optional.java:159)
>  at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:2308)
>  at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
>  at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>  at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
>  at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
>  at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
>  at akka.japi.pf
> .UnitCaseStatement.apply(CaseStatements.scala:24)
>  at akka.japi.pf
> .UnitCaseStatement.apply(CaseStatements.scala:20)
>  at
> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>  at
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>  at akka.japi.pf
> .UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>  at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>  at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>  at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>  at
> akka.actor.Actor.aroundReceive(Actor.scala:537)
>  at
> akka.actor.Actor.aroundReceive$(Actor.scala:535)
>  at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>  at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>  at
> akka.actor.ActorCell.invoke(ActorCell.scala:548)
>  at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>  at
> akka.dispatch.Mailbox.run(Mailbox.scala:231)
>  at
> akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>  at
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>  at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(For
> 
>  发自我的 iPhone


????????????????bug

2024-07-01 文章 wjw_bigd...@163.com




  
| ?? | wjw_bigd...@163.com |
|  | 2024??07??01?? 17:13 |
| ?? | user-zh |
| ?? | |
|  | bug |




  
| ?? | <2278179...@qq.com.INVALID> |
|  | 2024??06??29?? 21:31 |
| ?? | user-zh |
| ?? | |
|  | ?? ??bug |



----
??: 
   "user-zh"



??????????

2024-07-01 文章 wjw_bigd...@163.com




  
| ?? | wjw_bigd...@163.com |
|  | 2024??07??01?? 17:12 |
| ?? | user-zh |
| ?? | |
|  | ?? |




  
| ?? | zhanggongx |
|  | 2024??07??01?? 16:52 |
| ?? | user-zh |
| ?? | |
|  |  |


????????????????bug

2024-07-01 文章 wjw_bigd...@163.com




  
| ?? | <2278179...@qq.com.INVALID> |
|  | 2024??06??29?? 21:31 |
| ?? | user-zh |
| ?? | |
|  | ?? ??bug |



----
??: 
   "user-zh"



??????????

2024-07-01 文章 wjw_bigd...@163.com




  
| ?? | zhanggongx |
|  | 2024??07??01?? 16:52 |
| ?? | user-zh |
| ?? | |
|  |  |


Re: 这绝对算是bug

2024-06-29 文章 Cuixb
连接没问题,主要是tm一直在处理写入流,我也看了一下负载,其实不高,但就是不相应,导致报timeout,然后就是最开始那个错误!
发自我的 iPhone

> 在 2024年6月29日,16:49,Zhanghao Chen  写道:
> 
> Hi,从报错看是 JM 丢主了,导致 TM 上 task 全部关停。看下 JM 侧是不是 HA 连接有问题呢?
> 
> Best,
> Zhanghao Chen
> 
> From: Cuixb 
> Sent: Saturday, June 29, 2024 10:31
> To: user-zh@flink.apache.org 
> Subject: 这绝对算是bug
> 
> 生产环境Flink 1.16.2
> 
> 2024-06-29 09:17:23
> java.lang.Exception: Job leader for job id 8ccdd299194a686e3ecda602c3c75bf3 
> lost leadership.
>at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:2310)
>at java.util.Optional.ifPresent(Optional.java:159)
>at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:2308)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
>at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>at akka.actor.Actor.aroundReceive(Actor.scala:537)
>at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(For
> 
> 发自我的 iPhone



Re: 这绝对算是bug

2024-06-29 文章 Zhanghao Chen
Hi,从报错看是 JM 丢主了,导致 TM 上 task 全部关停。看下 JM 侧是不是 HA 连接有问题呢?

Best,
Zhanghao Chen

From: Cuixb 
Sent: Saturday, June 29, 2024 10:31
To: user-zh@flink.apache.org 
Subject: 这绝对算是bug

生产环境Flink 1.16.2

2024-06-29 09:17:23
java.lang.Exception: Job leader for job id 8ccdd299194a686e3ecda602c3c75bf3 
lost leadership.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:2310)
at java.util.Optional.ifPresent(Optional.java:159)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:2308)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(For

发自我的 iPhone


这绝对算是bug

2024-06-28 文章 Cuixb
生产环境Flink 1.16.2

2024-06-29 09:17:23
java.lang.Exception: Job leader for job id 8ccdd299194a686e3ecda602c3c75bf3 
lost leadership.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:2310)
at java.util.Optional.ifPresent(Optional.java:159)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:2308)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(For

发自我的 iPhone


回复:退订

2024-06-28 文章 wjw_bigdata
退订








 回复的原邮件 
| 发件人 | 张胜军<15910621...@139.com> |
| 发送日期 | 2024年6月28日 12:57 |
| 收件人 | user-zh |
| 主题 | 退订 |



退订
















扫一扫,


快速添加名片到手机







The following is the content of the forwarded email
From:Yanquan Lv  
To:user-zh 
Date:2024-06-26 16:46:05
Subject:Re: Re:cdc读取oracle数据如何解析

可以的,通过设置 debezium 的 decimal.handling.mode

[1] 参数可以实现你的需求,转成
double 或者 string 来处理。

[1]
https://debezium.io/documentation/reference/1.9/connectors/oracle.html#oracle-numeric-types

ha.fen...@aisino.com  于2024年6月26日周三 16:35写道:

Map customConverterConfigs = new HashMap<>()
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
"numeric")
PRICE是decimal 类型,按上面设置可以正常了,ID是NUMBER,还是显示"ID":{"scale":0,"value":"AQ=="}
我试了下,我的ID是1,base64后是MQ==,这个ID还不是base64以后的结果。
通过JsonDebeziumDeserializationSchema(true,
customConverterConfigs)打印出来schema
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"scale"},{"type":"bytes","optional":false,"field":"value"}],"optional":false,"name":"io.debezium.data.VariableScaleDecimal","version":1,"doc":"Variable
scaled decimal","field":"ID"},
那是不是有什么方法,把对应的字段类型先设置好传进去

发件人: Yanquan Lv
发送时间: 2024-06-26 14:46
收件人: user-zh
主题: Re: 回复:cdc读取oracle数据如何解析
你好,你的 ID 和 PRINCE 字段类型是 decimal 吗,decimal 默认的展示方式是使用 BASE64 处理

可以通过添加下面代码来让展示信息更直观。

Map customConverterConfigs = new HashMap<>()
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
"numeric")
JsonDebeziumDeserializationSchema schema = new
JsonDebeziumDeserializationSchema(includeSchema,
customConverterConfigs)




ha.fen...@aisino.com  于2024年6月25日周二 17:26写道:

数据没问题
"ID" "NAME"   "ADDTIME""PRICE"
1 "aa" 2024-6-25 14:21:33  12.22

发件人: 15868861416
发送时间: 2024-06-25 17:19
收件人: user-zh@flink.apache.org
主题: 回复:cdc读取oracle数据如何解析
检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串




| |
博星
|
|
15868861...@163.com
|


 回复的原邮件 
| 发件人 | ha.fen...@aisino.com |
| 发送日期 | 2024年06月25日 15:54 |
| 收件人 | user-zh |
| 主题 | cdc读取oracle数据如何解析 |
根据文档的代码
JdbcIncrementalSource oracleChangeEventSource =
new OracleSourceBuilder()
.hostname("host")
.port(1521)
.databaseList("ORCLCDB")
.schemaList("DEBEZIUM")
.tableList("DEBEZIUM.PRODUCTS")
.username("username")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true) // output the schema changes as well
.startupOptions(StartupOptions.initial())
.debeziumProperties(debeziumProperties)
.splitSize(2)
.build()
返回的结果:


{"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null}

如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同








退订

2024-06-27 文章 张胜军



退订
















扫一扫,


快速添加名片到手机







The following is the content of the forwarded email
From:Yanquan Lv  
To:user-zh 
Date:2024-06-26 16:46:05
Subject:Re: Re:cdc读取oracle数据如何解析

可以的,通过设置 debezium 的 decimal.handling.mode

[1] 参数可以实现你的需求,转成
double 或者 string 来处理。

[1]
https://debezium.io/documentation/reference/1.9/connectors/oracle.html#oracle-numeric-types

ha.fen...@aisino.com  于2024年6月26日周三 16:35写道:

> Map customConverterConfigs = new HashMap<>()
> customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
> "numeric")
> PRICE是decimal 类型,按上面设置可以正常了,ID是NUMBER,还是显示"ID":{"scale":0,"value":"AQ=="}
> 我试了下,我的ID是1,base64后是MQ==,这个ID还不是base64以后的结果。
> 通过JsonDebeziumDeserializationSchema(true,
> customConverterConfigs)打印出来schema
> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"scale"},{"type":"bytes","optional":false,"field":"value"}],"optional":false,"name":"io.debezium.data.VariableScaleDecimal","version":1,"doc":"Variable
> scaled decimal","field":"ID"},
> 那是不是有什么方法,把对应的字段类型先设置好传进去
>
> 发件人: Yanquan Lv
> 发送时间: 2024-06-26 14:46
> 收件人: user-zh
> 主题: Re: 回复:cdc读取oracle数据如何解析
> 你好,你的 ID 和 PRINCE 字段类型是 decimal 吗,decimal 默认的展示方式是使用 BASE64 处理
>
> 可以通过添加下面代码来让展示信息更直观。
>
> Map customConverterConfigs = new HashMap<>()
> customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
> "numeric")
> JsonDebeziumDeserializationSchema schema = new
> JsonDebeziumDeserializationSchema(includeSchema,
> customConverterConfigs)
>
>
>
>
> ha.fen...@aisino.com  于2024年6月25日周二 17:26写道:
>
> > 数据没问题
> > "ID" "NAME"   "ADDTIME""PRICE"
> > 1 "aa" 2024-6-25 14:21:33  12.22
> >
> > 发件人: 15868861416
> > 发送时间: 2024-06-25 17:19
> > 收件人: user-zh@flink.apache.org
> > 主题: 回复:cdc读取oracle数据如何解析
> > 检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串
> >
> >
> >
> >
> > | |
> > 博星
> > |
> > |
> > 15868861...@163.com
> > |
> >
> >
> >  回复的原邮件 
> > | 发件人 | ha.fen...@aisino.com |
> > | 发送日期 | 2024年06月25日 15:54 |
> > | 收件人 | user-zh |
> > | 主题 | cdc读取oracle数据如何解析 |
> > 根据文档的代码
> > JdbcIncrementalSource oracleChangeEventSource =
> > new OracleSourceBuilder()
> > .hostname("host")
> > .port(1521)
> > .databaseList("ORCLCDB")
> > .schemaList("DEBEZIUM")
> > .tableList("DEBEZIUM.PRODUCTS")
> > .username("username")
> > .password("password")
> > .deserializer(new JsonDebeziumDeserializationSchema())
> > .includeSchemaChanges(true) // output the schema changes as well
> > .startupOptions(StartupOptions.initial())
> > .debeziumProperties(debeziumProperties)
> > .splitSize(2)
> > .build()
> > 返回的结果:
> >
> >
> {"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null}
> >
> > 如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同
> >
>






Re: Re:cdc读取oracle数据如何解析

2024-06-26 文章 Yanquan Lv
可以的,通过设置 debezium 的 decimal.handling.mode

[1] 参数可以实现你的需求,转成
double 或者 string 来处理。

[1]
https://debezium.io/documentation/reference/1.9/connectors/oracle.html#oracle-numeric-types

ha.fen...@aisino.com  于2024年6月26日周三 16:35写道:

> Map customConverterConfigs = new HashMap<>();
> customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
> "numeric");
> PRICE是decimal 类型,按上面设置可以正常了,ID是NUMBER,还是显示"ID":{"scale":0,"value":"AQ=="}
> 我试了下,我的ID是1,base64后是MQ==,这个ID还不是base64以后的结果。
> 通过JsonDebeziumDeserializationSchema(true,
> customConverterConfigs);打印出来schema
> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"scale"},{"type":"bytes","optional":false,"field":"value"}],"optional":false,"name":"io.debezium.data.VariableScaleDecimal","version":1,"doc":"Variable
> scaled decimal","field":"ID"},
> 那是不是有什么方法,把对应的字段类型先设置好传进去
>
> 发件人: Yanquan Lv
> 发送时间: 2024-06-26 14:46
> 收件人: user-zh
> 主题: Re: 回复:cdc读取oracle数据如何解析
> 你好,你的 ID 和 PRINCE 字段类型是 decimal 吗,decimal 默认的展示方式是使用 BASE64 处理
>
> 可以通过添加下面代码来让展示信息更直观。
>
> Map customConverterConfigs = new HashMap<>();
> customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
> "numeric");
> JsonDebeziumDeserializationSchema schema = new
> JsonDebeziumDeserializationSchema(includeSchema,
> customConverterConfigs);
>
>
>
>
> ha.fen...@aisino.com  于2024年6月25日周二 17:26写道:
>
> > 数据没问题
> > "ID" "NAME"   "ADDTIME""PRICE"
> > 1 "aa" 2024-6-25 14:21:33  12.22
> >
> > 发件人: 15868861416
> > 发送时间: 2024-06-25 17:19
> > 收件人: user-zh@flink.apache.org
> > 主题: 回复:cdc读取oracle数据如何解析
> > 检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串
> >
> >
> >
> >
> > | |
> > 博星
> > |
> > |
> > 15868861...@163.com
> > |
> >
> >
> >  回复的原邮件 
> > | 发件人 | ha.fen...@aisino.com |
> > | 发送日期 | 2024年06月25日 15:54 |
> > | 收件人 | user-zh |
> > | 主题 | cdc读取oracle数据如何解析 |
> > 根据文档的代码
> > JdbcIncrementalSource oracleChangeEventSource =
> > new OracleSourceBuilder()
> > .hostname("host")
> > .port(1521)
> > .databaseList("ORCLCDB")
> > .schemaList("DEBEZIUM")
> > .tableList("DEBEZIUM.PRODUCTS")
> > .username("username")
> > .password("password")
> > .deserializer(new JsonDebeziumDeserializationSchema())
> > .includeSchemaChanges(true) // output the schema changes as well
> > .startupOptions(StartupOptions.initial())
> > .debeziumProperties(debeziumProperties)
> > .splitSize(2)
> > .build();
> > 返回的结果:
> >
> >
> {"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null}
> >
> > 如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同
> >
>


??????????

2024-06-26 文章 wjw_bigd...@163.com




  
| ?? | <402987...@qq.com.INVALID> |
|  | 2024??06??26?? 16:38 |
| ?? | user-zh |
| ?? | |
|  | ?? |














----
??: 
   "user-zh"

<15171440...@163.com;
:2024??6??26??(??) 4:36
??:"user-zh"

回复:退订

2024-06-26 文章 费文杰



















在 2024-06-26 15:07:45,"15868861416" <15868861...@163.com> 写道:
>你好,可以把ID和PRICE的类型改为NUMBER试一下,我这边flink-sql试过number类型对应到iceberg的decimal数据是正常的
>
>
>| |
>博星
>|
>|
>15868861...@163.com
>|
>
>
> 回复的原邮件 
>| 发件人 | Yanquan Lv |
>| 发送日期 | 2024年06月26日 14:46 |
>| 收件人 |  |
>| 主题 | Re: 回复:cdc读取oracle数据如何解析 |
>你好,你的 ID 和 PRINCE 字段类型是 decimal 吗,decimal 默认的展示方式是使用 BASE64 处理
>
>可以通过添加下面代码来让展示信息更直观。
>
>Map customConverterConfigs = new HashMap<>();
>customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
>"numeric");
>JsonDebeziumDeserializationSchema schema = new
>JsonDebeziumDeserializationSchema(includeSchema,
>customConverterConfigs);
>
>
>
>
>ha.fen...@aisino.com  于2024年6月25日周二 17:26写道:
>
>数据没问题
>"ID" "NAME"   "ADDTIME""PRICE"
>1 "aa" 2024-6-25 14:21:33  12.22
>
>发件人: 15868861416
>发送时间: 2024-06-25 17:19
>收件人: user-zh@flink.apache.org
>主题: 回复:cdc读取oracle数据如何解析
>检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串
>
>
>
>
>| |
>博星
>|
>|
>15868861...@163.com
>|
>
>
> 回复的原邮件 
>| 发件人 | ha.fen...@aisino.com |
>| 发送日期 | 2024年06月25日 15:54 |
>| 收件人 | user-zh |
>| 主题 | cdc读取oracle数据如何解析 |
>根据文档的代码
>JdbcIncrementalSource oracleChangeEventSource =
>new OracleSourceBuilder()
>.hostname("host")
>.port(1521)
>.databaseList("ORCLCDB")
>.schemaList("DEBEZIUM")
>.tableList("DEBEZIUM.PRODUCTS")
>.username("username")
>.password("password")
>.deserializer(new JsonDebeziumDeserializationSchema())
>.includeSchemaChanges(true) // output the schema changes as well
>.startupOptions(StartupOptions.initial())
>.debeziumProperties(debeziumProperties)
>.splitSize(2)
>.build();
>返回的结果:
>
>{"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null}
>
>如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同
>


回复: cdc读取oracle数据如何解析

2024-06-26 文章 15868861416
你好,可以把ID和PRICE的类型改为NUMBER试一下,我这边flink-sql试过number类型对应到iceberg的decimal数据是正常的


| |
博星
|
|
15868861...@163.com
|


 回复的原邮件 
| 发件人 | Yanquan Lv |
| 发送日期 | 2024年06月26日 14:46 |
| 收件人 |  |
| 主题 | Re: 回复:cdc读取oracle数据如何解析 |
你好,你的 ID 和 PRINCE 字段类型是 decimal 吗,decimal 默认的展示方式是使用 BASE64 处理

可以通过添加下面代码来让展示信息更直观。

Map customConverterConfigs = new HashMap<>();
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
"numeric");
JsonDebeziumDeserializationSchema schema = new
JsonDebeziumDeserializationSchema(includeSchema,
customConverterConfigs);




ha.fen...@aisino.com  于2024年6月25日周二 17:26写道:

数据没问题
"ID" "NAME"   "ADDTIME""PRICE"
1 "aa" 2024-6-25 14:21:33  12.22

发件人: 15868861416
发送时间: 2024-06-25 17:19
收件人: user-zh@flink.apache.org
主题: 回复:cdc读取oracle数据如何解析
检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串




| |
博星
|
|
15868861...@163.com
|


 回复的原邮件 
| 发件人 | ha.fen...@aisino.com |
| 发送日期 | 2024年06月25日 15:54 |
| 收件人 | user-zh |
| 主题 | cdc读取oracle数据如何解析 |
根据文档的代码
JdbcIncrementalSource oracleChangeEventSource =
new OracleSourceBuilder()
.hostname("host")
.port(1521)
.databaseList("ORCLCDB")
.schemaList("DEBEZIUM")
.tableList("DEBEZIUM.PRODUCTS")
.username("username")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true) // output the schema changes as well
.startupOptions(StartupOptions.initial())
.debeziumProperties(debeziumProperties)
.splitSize(2)
.build();
返回的结果:

{"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null}

如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同



Re: 回复:cdc读取oracle数据如何解析

2024-06-26 文章 Yanquan Lv
你好,你的 ID 和 PRINCE 字段类型是 decimal 吗,decimal 默认的展示方式是使用 BASE64 处理

可以通过添加下面代码来让展示信息更直观。

Map customConverterConfigs = new HashMap<>();
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
"numeric");
JsonDebeziumDeserializationSchema schema = new
JsonDebeziumDeserializationSchema(includeSchema,
customConverterConfigs);




ha.fen...@aisino.com  于2024年6月25日周二 17:26写道:

> 数据没问题
> "ID" "NAME"   "ADDTIME""PRICE"
> 1 "aa" 2024-6-25 14:21:33  12.22
>
> 发件人: 15868861416
> 发送时间: 2024-06-25 17:19
> 收件人: user-zh@flink.apache.org
> 主题: 回复:cdc读取oracle数据如何解析
> 检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串
>
>
>
>
> | |
> 博星
> |
> |
> 15868861...@163.com
> |
>
>
>  回复的原邮件 
> | 发件人 | ha.fen...@aisino.com |
> | 发送日期 | 2024年06月25日 15:54 |
> | 收件人 | user-zh |
> | 主题 | cdc读取oracle数据如何解析 |
> 根据文档的代码
> JdbcIncrementalSource oracleChangeEventSource =
> new OracleSourceBuilder()
> .hostname("host")
> .port(1521)
> .databaseList("ORCLCDB")
> .schemaList("DEBEZIUM")
> .tableList("DEBEZIUM.PRODUCTS")
> .username("username")
> .password("password")
> .deserializer(new JsonDebeziumDeserializationSchema())
> .includeSchemaChanges(true) // output the schema changes as well
> .startupOptions(StartupOptions.initial())
> .debeziumProperties(debeziumProperties)
> .splitSize(2)
> .build();
> 返回的结果:
>
> {"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null}
>
> 如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同
>


回复:cdc读取oracle数据如何解析

2024-06-25 文章 wjw_bigd...@163.com
退订真的很麻烦,,,退订了好几次没搞懂



 回复的原邮件 
| 发件人 | ha.fen...@aisino.com |
| 日期 | 2024年06月25日 17:25 |
| 收件人 | user-zh |
| 抄送至 | |
| 主题 | Re: 回复:cdc读取oracle数据如何解析 |
数据没问题
"ID" "NAME"   "ADDTIME""PRICE"
1 "aa" 2024-6-25 14:21:33  12.22

发件人: 15868861416
发送时间: 2024-06-25 17:19
收件人: user-zh@flink.apache.org
主题: 回复:cdc读取oracle数据如何解析
检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串




| |
博星
|
|
15868861...@163.com
|


 回复的原邮件 
| 发件人 | ha.fen...@aisino.com |
| 发送日期 | 2024年06月25日 15:54 |
| 收件人 | user-zh |
| 主题 | cdc读取oracle数据如何解析 |
根据文档的代码
JdbcIncrementalSource oracleChangeEventSource =
new OracleSourceBuilder()
.hostname("host")
.port(1521)
.databaseList("ORCLCDB")
.schemaList("DEBEZIUM")
.tableList("DEBEZIUM.PRODUCTS")
.username("username")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true) // output the schema changes as well
.startupOptions(StartupOptions.initial())
.debeziumProperties(debeziumProperties)
.splitSize(2)
.build();
返回的结果:
{"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null}

如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同


回复:cdc读取oracle数据如何解析

2024-06-25 文章 15868861416
检查一下oracle中的DEBEZIUM.PRODUCTS这个表的数据,看你解析中有字符串




| |
博星
|
|
15868861...@163.com
|


 回复的原邮件 
| 发件人 | ha.fen...@aisino.com |
| 发送日期 | 2024年06月25日 15:54 |
| 收件人 | user-zh |
| 主题 | cdc读取oracle数据如何解析 |
根据文档的代码
JdbcIncrementalSource oracleChangeEventSource =
new OracleSourceBuilder()
.hostname("host")
.port(1521)
.databaseList("ORCLCDB")
.schemaList("DEBEZIUM")
.tableList("DEBEZIUM.PRODUCTS")
.username("username")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true) // output the schema changes as well
.startupOptions(StartupOptions.initial())
.debeziumProperties(debeziumProperties)
.splitSize(2)
.build();
返回的结果:
{"before":null,"after":{"ID":{"scale":0,"value":"AQ=="},"NAME":"aa","ADDTIME":1719325293000,"PRICE":"BMY="},"source":{"version":"1.9.8.Final","connector":"oracle","name":"oracle_logminer","ts_ms":0,"snapshot":"false","db":"ORCL","sequence":null,"schema":"CDC","table":"ORDER_CDC","txId":null,"scn":"0","commit_scn":null,"lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":null},"op":"r","ts_ms":1719301769186,"transaction":null}

如何解析?PRICE应该是数字,ID也是数字,这里显示的都不同


Flink AsyncWriter如何进行固定速率的限速?这一块似乎有bug

2024-06-25 文章 jinzhuguang
Flink 1.16.0

搜索到社区有相关文章,其中的实例如下:
https://flink.apache.org/2022/11/25/optimising-the-throughput-of-async-sinks-using-a-custom-ratelimitingstrategy/#rationale-behind-the-ratelimitingstrategy-interface


public class TokenBucketRateLimitingStrategy implements RateLimitingStrategy {



private final Bucket bucket;



public TokenBucketRateLimitingStrategy() {

Refill refill = Refill.intervally(1, Duration.ofSeconds(1));

Bandwidth limit = Bandwidth.classic(10, refill);

this.bucket = Bucket4j.builder()

.addLimit(limit)

.build();

}



// ... (information methods not needed)



@Override

public boolean shouldBlock(RequestInfo requestInfo) {

return bucket.tryConsume(requestInfo.getBatchSize());

}



}



我但是这个shouldblock的返回值似乎是反的,我实际使用后发现会发现异步的线程池的队列会很快被打满,抛出RejectedExecutionException。



回复: 使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列

2024-06-20 文章 15868861416
参考这个案例试试:
CREATE TEMPORARY TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  `proc_time` AS PROCTIME()
) WITH (
  'connector'='datagen'
);

CREATE TEMPORARY TABLE hbase_dim (
  rowkey INT,
  family1 ROW,
  family2 ROW,
  family3 ROW
) WITH (
  'connector'='cloudhbase',
  'table-name'='',
  'zookeeper.quorum'=''
);

CREATE TEMPORARY TABLE blackhole_sink(
  a INT,
  f1c1 INT,
  f3c3 STRING
) WITH (
  'connector'='blackhole'
);

INSERTINTO blackhole_sink
 SELECT a, family1.col1 as f1c1,  family3.col3 as f3c3 FROM datagen_source
JOIN hbase_dim FORSYSTEM_TIMEASOF datagen_source.`proc_time` as h ON 
datagen_source.a = h.rowkey;


| |
博星
|
|
15868861...@163.com
|


 回复的原邮件 
| 发件人 | xiaohui zhang |
| 发送日期 | 2024年06月20日 10:03 |
| 收件人 |  |
| 主题 | Re: 使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列 |
flink在写入时需要所有DDL中定义的字段都必须被同时写入,不支持sql中只使用部分字段。
如果你确定只需写入部分数据,在DDL中只定义你用到的部分


zboyu0104  于2024年6月14日周五 15:43写道:

怎么退订
from 阿里邮箱
iPhone--
发件人:谢县东
日 期:2024年06月06日 16:07:05
收件人:
主 题:使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列

各位好:


flink版本: 1.13.6
我在使用 flink-connector-hbase 连接器,通过flinkSQL 将数据写入hbase,hbase 建表如下:


CREATE TABLE hbase_test_db_test_table_xxd (
rowkey STRING,
cf1 ROW,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'test_db:test_table_t1',
'zookeeper.quorum' = 'xxx:2181',
'zookeeper.znode.parent' = '/hbase',
'null-string-literal' = '',
'sink.parallelism' = '2'
);


hbase cf1列族下有三列,看官网示例插入数据时需要构建一个row类型插入(row类型需包含列族下的所有列)
INSERT INTO hbase_test_db_test_table_xxd  select '002' as rowkey,
row('xxd_2', 'boy', '10') as cf1;




如果只想更新其中某一列如何实现?在flink中新建一个hbase表吗?











Re: 使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列

2024-06-19 文章 xiaohui zhang
flink在写入时需要所有DDL中定义的字段都必须被同时写入,不支持sql中只使用部分字段。
如果你确定只需写入部分数据,在DDL中只定义你用到的部分


zboyu0104  于2024年6月14日周五 15:43写道:

> 怎么退订
> from 阿里邮箱
> iPhone--
> 发件人:谢县东
> 日 期:2024年06月06日 16:07:05
> 收件人:
> 主 题:使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列
>
> 各位好:
>
>
> flink版本: 1.13.6
> 我在使用 flink-connector-hbase 连接器,通过flinkSQL 将数据写入hbase,hbase 建表如下:
>
>
> CREATE TABLE hbase_test_db_test_table_xxd (
> rowkey STRING,
> cf1 ROW,
> PRIMARY KEY (rowkey) NOT ENFORCED
> ) WITH (
> 'connector' = 'hbase-2.2',
> 'table-name' = 'test_db:test_table_t1',
> 'zookeeper.quorum' = 'xxx:2181',
> 'zookeeper.znode.parent' = '/hbase',
> 'null-string-literal' = '',
> 'sink.parallelism' = '2'
> );
>
>
> hbase cf1列族下有三列,看官网示例插入数据时需要构建一个row类型插入(row类型需包含列族下的所有列)
> INSERT INTO hbase_test_db_test_table_xxd  select '002' as rowkey,
> row('xxd_2', 'boy', '10') as cf1;
>
>
>
>
> 如果只想更新其中某一列如何实现?在flink中新建一个hbase表吗?
>
>
>
>
>
>
>
>
>


Re: Flink如何做到动态关联join多张维度表中的n张表?

2024-06-19 文章 xiaohui zhang
lookup join可以关联多张维表,但是维表的更新不会触发历史数据刷新。
多维表关联的时候,需要考虑多次关联导致的延迟,以及查询tps对维表数据库的压力。

斗鱼 <1227581...@qq.com.invalid> 于2024年6月19日周三 23:12写道:

> 好的,感谢大佬的回复,之前有了解到Flink的Lookup join好像可以实现类似逻辑,只是不知道Lookup join会不会支持多张动态维度表呢?
>
>
> 斗鱼
> 1227581...@qq.com
>
>
>
> 
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> xhzhang...@gmail.com;
> 发送时间:2024年6月19日(星期三) 下午5:55
> 收件人:"user-zh"
> 主题:Re: Flink如何做到动态关联join多张维度表中的n张表?
>
>
>
>
> 维表更新后要刷新历史的事实表吗?这个用flink来做的话,几乎不太可能实现,尤其是涉及到多个维表,相当于每次维表又更新了,就要从整个历史数据里面找到关联的数据,重新写入。不管是状态存储,还是更新数据量,需要的资源都太高,无法处理。
> 在我们目前的实时宽表应用里面,实时表部分一般都是流水类的,取到的维表信息,就应该是业务事实发生时的数据。
> 维表更新后刷新事实的,一般都是夜间批量再更新。如果有强实时更新需求的,只能在查询时再关联维表取最新值
>
> 王旭 
>  互相交流哈,我们也在做类似的改造
>  1.不确定需要关联几张维表的话,是否可以直接都关联了,然后再根据驱动数据中的字段判断要取哪几张维度表的数据,类似left join
> 
> 
> 2.维表变化后对应的结果表也要刷新这个场景,你有提到维表数据是亿级别,可想而知事实表数据更大,如果要反向关联全量事实表的数据,感觉不太适合用流处理;如果只是刷新部分的话,倒是可以将n天内的数据暂存至外部存储介质中
> 
> 
> 
>   回复的原邮件 
>  | 发件人 | 斗鱼<1227581...@qq.com.INVALID |
>  | 日期 | 2024年06月16日 21:08 |
>  | 收件人 | user-zh  | 抄送至 | |
>  | 主题 | 回复:Flink如何做到动态关联join多张维度表中的n张表? |
> 
> 
> 大佬,目前我们还处在调研阶段,SQL或datastream都可以,目前我们DWD或维度表设计是存在ClickHouse/Doris,目前在设计未来的架构,还没实现,只是想向各位大佬取经,麻烦大佬帮忙指教下
> 
> 
>  斗鱼
>  1227581...@qq.com
> 
> 
> 
>  nbsp;
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
> "user-zh"
> 
> <
>  xwwan...@163.comgt;;
>  发送时间:nbsp;2024年6月16日(星期天) 晚上9:03
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;回复:Flink如何做到动态关联join多张维度表中的n张表?
> 
> 
> 
>  你好,请问你们是用flink sql api还是datastream api实现这个场景的
> 
> 
> 
>   回复的原邮件 
>  | 发件人 | 斗鱼<1227581...@qq.com.INVALIDgt; |
>  | 日期 | 2024年06月16日 20:35 |
>  | 收件人 | user-zh  | 抄送至 | |
>  | 主题 | Flink如何做到动态关联join多张维度表中的n张表? |
>  请教下各位大佬,目前我们遇到一个场景:
>  1、需要往DWD事实表里写入数据的同时,往Kafka里面写该DWD表的记录信息,该信息
>  2、该Kafka信息会包含一个维度表数据类型的字符串数组
> 
> 
> 3、Flink在做实时消费Kafka中数据,根据类型数组,关联不同的维度表,如数组包含【1,2】,则Flink读取Kafka消息后,将DWD的数据关联维度表1和维度表2后,写入DWS表
> 
> 
> 
> 
> 想请问大佬如何实现根据该数组信息动态关联维度表,这些维度表数据量都挺大的,亿级别的数据,需要能满足维度表变化后,关联后的DWS数据也能变化,不知道是否有什么技术方案能实现,有的话麻烦大佬帮忙给个简单示例或者参考链接,感谢!
> 
> 
> 
> 
> 
> 
> 
>  |
>  |
>  斗鱼
>  1227581...@qq.com
>  |
>  nbsp;


Re: Flink如何做到动态关联join多张维度表中的n张表?

2024-06-19 文章 xiaohui zhang
维表更新后要刷新历史的事实表吗?这个用flink来做的话,几乎不太可能实现,尤其是涉及到多个维表,相当于每次维表又更新了,就要从整个历史数据里面找到关联的数据,重新写入。不管是状态存储,还是更新数据量,需要的资源都太高,无法处理。
在我们目前的实时宽表应用里面,实时表部分一般都是流水类的,取到的维表信息,就应该是业务事实发生时的数据。
维表更新后刷新事实的,一般都是夜间批量再更新。如果有强实时更新需求的,只能在查询时再关联维表取最新值

王旭  于2024年6月16日周日 21:20写道:

> 互相交流哈,我们也在做类似的改造
> 1.不确定需要关联几张维表的话,是否可以直接都关联了,然后再根据驱动数据中的字段判断要取哪几张维度表的数据,类似left join
>
> 2.维表变化后对应的结果表也要刷新这个场景,你有提到维表数据是亿级别,可想而知事实表数据更大,如果要反向关联全量事实表的数据,感觉不太适合用流处理;如果只是刷新部分的话,倒是可以将n天内的数据暂存至外部存储介质中
>
>
>
>  回复的原邮件 
> | 发件人 | 斗鱼<1227581...@qq.com.INVALID> |
> | 日期 | 2024年06月16日 21:08 |
> | 收件人 | user-zh |
> | 抄送至 | |
> | 主题 | 回复:Flink如何做到动态关联join多张维度表中的n张表? |
>
> 大佬,目前我们还处在调研阶段,SQL或datastream都可以,目前我们DWD或维度表设计是存在ClickHouse/Doris,目前在设计未来的架构,还没实现,只是想向各位大佬取经,麻烦大佬帮忙指教下
>
>
> 斗鱼
> 1227581...@qq.com
>
>
>
> 
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> xwwan...@163.com;
> 发送时间:2024年6月16日(星期天) 晚上9:03
> 收件人:"user-zh"
> 主题:回复:Flink如何做到动态关联join多张维度表中的n张表?
>
>
>
> 你好,请问你们是用flink sql api还是datastream api实现这个场景的
>
>
>
>  回复的原邮件 
> | 发件人 | 斗鱼<1227581...@qq.com.INVALID |
> | 日期 | 2024年06月16日 20:35 |
> | 收件人 | user-zh | 抄送至 | |
> | 主题 | Flink如何做到动态关联join多张维度表中的n张表? |
> 请教下各位大佬,目前我们遇到一个场景:
> 1、需要往DWD事实表里写入数据的同时,往Kafka里面写该DWD表的记录信息,该信息
> 2、该Kafka信息会包含一个维度表数据类型的字符串数组
>
> 3、Flink在做实时消费Kafka中数据,根据类型数组,关联不同的维度表,如数组包含【1,2】,则Flink读取Kafka消息后,将DWD的数据关联维度表1和维度表2后,写入DWS表
>
>
>
> 想请问大佬如何实现根据该数组信息动态关联维度表,这些维度表数据量都挺大的,亿级别的数据,需要能满足维度表变化后,关联后的DWS数据也能变化,不知道是否有什么技术方案能实现,有的话麻烦大佬帮忙给个简单示例或者参考链接,感谢!
>
>
>
>
>
>
>
> |
> |
> 斗鱼
> 1227581...@qq.com
> |
> 


Re: [ANNOUNCE] Apache Flink CDC 3.1.1 released

2024-06-18 文章 Paul Lam
Well done! Thanks a lot for your hard work!

Best,
Paul Lam

> 2024年6月19日 09:47,Leonard Xu  写道:
> 
> Congratulations! Thanks Qingsheng for the release work and all contributors 
> involved.
> 
> Best,
> Leonard 
> 
>> 2024年6月18日 下午11:50,Qingsheng Ren  写道:
>> 
>> The Apache Flink community is very happy to announce the release of Apache
>> Flink CDC 3.1.1.
>> 
>> Apache Flink CDC is a distributed data integration tool for real time data
>> and batch data, bringing the simplicity and elegance of data integration
>> via YAML to describe the data movement and transformation in a data
>> pipeline.
>> 
>> Please check out the release blog post for an overview of the release:
>> https://flink.apache.org/2024/06/18/apache-flink-cdc-3.1.1-release-announcement/
>> 
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>> 
>> Maven artifacts for Flink CDC can be found at:
>> https://search.maven.org/search?q=g:org.apache.flink%20cdc
>> 
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354763
>> 
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>> 
>> Regards,
>> Qingsheng Ren
> 



Re: [ANNOUNCE] Apache Flink CDC 3.1.1 released

2024-06-18 文章 Leonard Xu
Congratulations! Thanks Qingsheng for the release work and all contributors 
involved.

Best,
Leonard 

> 2024年6月18日 下午11:50,Qingsheng Ren  写道:
> 
> The Apache Flink community is very happy to announce the release of Apache
> Flink CDC 3.1.1.
> 
> Apache Flink CDC is a distributed data integration tool for real time data
> and batch data, bringing the simplicity and elegance of data integration
> via YAML to describe the data movement and transformation in a data
> pipeline.
> 
> Please check out the release blog post for an overview of the release:
> https://flink.apache.org/2024/06/18/apache-flink-cdc-3.1.1-release-announcement/
> 
> The release is available for download at:
> https://flink.apache.org/downloads.html
> 
> Maven artifacts for Flink CDC can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20cdc
> 
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354763
> 
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
> 
> Regards,
> Qingsheng Ren



[ANNOUNCE] Apache Flink CDC 3.1.1 released

2024-06-18 文章 Qingsheng Ren
The Apache Flink community is very happy to announce the release of Apache
Flink CDC 3.1.1.

Apache Flink CDC is a distributed data integration tool for real time data
and batch data, bringing the simplicity and elegance of data integration
via YAML to describe the data movement and transformation in a data
pipeline.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/2024/06/18/apache-flink-cdc-3.1.1-release-announcement/

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink CDC can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20cdc

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354763

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Qingsheng Ren


flink checkpoint 延迟的性能问题讨论

2024-06-16 文章 15868861416
各位大佬,
背景:
实际测试flink读Kafka 数据写入hudi, checkpoint的间隔时间是1min, 
state.backend分别为filesystem,测试结果如下:



写hudi的checkpoint 的延迟





写iceberg得延迟:



疑问: hudi的checkpoint的文件数据比iceberg要大很多,如何降低flink写hudi的checkpoint的延迟?


| |
博星
|
|
15868861...@163.com
|



??????Flink????????????????join??????????????n??????

2024-06-16 文章 ????

1.left
 join
2.n??



  
| ?? | <1227581...@qq.com.INVALID> |
|  | 2024??06??16?? 21:08 |
| ?? | user-zh |
| ?? | |
|  | ??Flinkjoin??n?? |
??SQL??datastreamDWD??ClickHouse/Doris??



1227581...@qq.com








----
??: 
   "user-zh"



??????Flink????????????????join??????????????n??????

2024-06-16 文章 ????
??flink sql apidatastream api??



  
| ?? | <1227581...@qq.com.INVALID> |
|  | 2024??06??16?? 20:35 |
| ?? | user-zh |
| ?? | |
|  | Flinkjoin??n?? |
??
1DWD??KafkaDWD
2Kafka
3??FlinkKafka1,2??FlinkKafka??DWD12DWS??


DWS







|
|

1227581...@qq.com
|
 

Flink????????????????join??????????????n??????

2024-06-16 文章 ????
??
1DWD??KafkaDWD
2Kafka
3??FlinkKafka1,2??FlinkKafka??DWD12DWS??


DWS










1227581...@qq.com





Re:使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列

2024-06-14 文章 zboyu0104
怎么退订
from 阿里邮箱 
iPhone--
发件人:谢县东
日 期:2024年06月06日 16:07:05
收件人:
主 题:使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列

各位好:


flink版本: 1.13.6
我在使用 flink-connector-hbase 连接器,通过flinkSQL 将数据写入hbase,hbase 建表如下:


CREATE TABLE hbase_test_db_test_table_xxd (
rowkey STRING,
cf1 ROW,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'test_db:test_table_t1',
'zookeeper.quorum' = 'xxx:2181',
'zookeeper.znode.parent' = '/hbase',
'null-string-literal' = '',
'sink.parallelism' = '2'
);


hbase cf1列族下有三列,看官网示例插入数据时需要构建一个row类型插入(row类型需包含列族下的所有列)
INSERT INTO hbase_test_db_test_table_xxd  select '002' as rowkey, row('xxd_2', 
'boy', '10') as cf1;




如果只想更新其中某一列如何实现?在flink中新建一个hbase表吗?










Re: flink cdc 3.0 schema变更问题

2024-06-13 文章 Yanquan Lv
你好,DataStream 的方式需要设置 includeSchemaChanges(true) 参数,并且设置自定义的
deserializer,参考这个链接[1]。
如果不想使用 json 的方式,希望自定义 deserializer,从 SourceRecord 里提取 ddl
的方式可以参考这个链接[2]提供的方案。

[1]
https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/faq/faq/#q6-i-want-to-get-ddl-events-in-the-database-what-should-i-do-is-there-a-demo
[2] https://developer.aliyun.com/article/1093413#slide-2

zapjone  于2024年6月13日周四 12:29写道:

> 大佬们好:
> 想请假下,在flink
> cdc3.0中支持schema变更,但看到是pipeline方式的,因业务问题需要使用datastream进行特殊处理,所以想请假下,在flink
> cdc 3.0中datastream api中怎么使用schema变更呢?或者相关文档呢?


Re: flink cdc 3.0 schema变更问题

2024-06-12 文章 Xiqian YU
Zapjone 好,

目前的 Schema Evolution 的实现依赖传递 CDC Event 事件的 Pipeline 连接器和框架。如果您希望插入自定义算子逻辑,建议参考 
flink-cdc-composer 模块中的 FlinkPipelineComposer 类构建算子链作业的方式,并在其中插入自定义的 Operator 
以实现您的业务逻辑。

另外,对于一些简单的处理逻辑,如果能够使用 YAML 作业的 Route(路由)、Transform(变换)功能表述的话,直接编写对应的 YAML 
规则会更简单。

祝好!

Regards,
yux

De : zapjone 
Date : jeudi, 13 juin 2024 à 12:29
À : user-zh@flink.apache.org 
Objet : flink cdc 3.0 schema变更问题
大佬们好:
想请假下,在flink 
cdc3.0中支持schema变更,但看到是pipeline方式的,因业务问题需要使用datastream进行特殊处理,所以想请假下,在flink cdc 
3.0中datastream api中怎么使用schema变更呢?或者相关文档呢?


flink cdc 3.0 schema变更问题

2024-06-12 文章 zapjone
大佬们好:
想请假下,在flink 
cdc3.0中支持schema变更,但看到是pipeline方式的,因业务问题需要使用datastream进行特殊处理,所以想请假下,在flink cdc 
3.0中datastream api中怎么使用schema变更呢?或者相关文档呢?

Re:changelogstream删除问题

2024-06-12 文章 Xuyang
Hi, 你可以试一下用statement set[1],将这个query同时写入到print sink中吗?




在tm日志里可以查看到print 
sink的结果,看看里面有没有-D类型的数据。如果没有的话,证明是test_changelog源表可能就没有-D的数据;如果有的话,就需要后续进一步排查sink表在ds和sql上的行为差异。







[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/insert/#insert-into-multiple-tables

[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/print/




--

Best!
Xuyang





在 2024-06-11 10:39:10,"zapjone"  写道:
>大佬们好:
>   我使用datastream 
> api进行实时读取mysql数据时,通过tableEnv.fromChangelogStream将datastram转换成了变更表,在使用sql将变更表数据写入数据湖中。
> 但经过测试,insert、update都可以正常实现,但delete无法实现删除操作,使用sql进行测试时,可以实现删除操作。(因有些逻辑需要api操作,就没有使用sql方式实现)。
>代码:
>StreamExecutionEnvironment env = ...;
>StreamTableEnvironment tableEnv = ...;
>MySqlSource mysqlSource = MysqlSouce.builder()build();
>SingleOutputStramOperator datastream = 
>env.fromSource(mysqlSource).flatMap();// 手动解析信息,并转换为Row类型,带有RowKind
>tableEnv.createTemporaryView("test_changelog",tableEnv.fromChangelogStream(datastream));
>tableEnv.executeSql("insert into xxx select * from test_changelog");
>
>
>


changelogstream删除问题

2024-06-10 文章 zapjone
大佬们好:
   我使用datastream 
api进行实时读取mysql数据时,通过tableEnv.fromChangelogStream将datastram转换成了变更表,在使用sql将变更表数据写入数据湖中。
 
但经过测试,insert、update都可以正常实现,但delete无法实现删除操作,使用sql进行测试时,可以实现删除操作。(因有些逻辑需要api操作,就没有使用sql方式实现)。
代码:
StreamExecutionEnvironment env = ...;
StreamTableEnvironment tableEnv = ...;
MySqlSource mysqlSource = MysqlSouce.builder()build();
SingleOutputStramOperator datastream = 
env.fromSource(mysqlSource).flatMap();// 手动解析信息,并转换为Row类型,带有RowKind
tableEnv.createTemporaryView("test_changelog",tableEnv.fromChangelogStream(datastream));
tableEnv.executeSql("insert into xxx select * from test_changelog");





使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列

2024-06-06 文章 谢县东
各位好:


flink版本: 1.13.6
我在使用 flink-connector-hbase 连接器,通过flinkSQL 将数据写入hbase,hbase 建表如下:


CREATE TABLE hbase_test_db_test_table_xxd (
rowkey STRING,
cf1 ROW,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'test_db:test_table_t1',
'zookeeper.quorum' = 'xxx:2181',
'zookeeper.znode.parent' = '/hbase',
'null-string-literal' = '',
'sink.parallelism' = '2'
);


hbase cf1列族下有三列,看官网示例插入数据时需要构建一个row类型插入(row类型需包含列族下的所有列)
INSERT INTO hbase_test_db_test_table_xxd  select '002' as rowkey, row('xxd_2', 
'boy', '10') as cf1;




如果只想更新其中某一列如何实现?在flink中新建一个hbase表吗?









Re:Re:Re: 请问flink sql作业如何给kafka source table消费限速?

2024-06-05 文章 Xuyang
Hi, 现在flink sql还没有办法限流。有需求的话可以建一个jira[1],在社区推进下。




[1] https://issues.apache.org/jira/projects/FLINK/issues




--

Best!
Xuyang





在 2024-06-05 15:33:30,"casel.chen"  写道:
>flink sql作业要如何配置进行限流消费呢?以防止打爆存储系统
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2024-06-05 14:46:23,"Alex Ching"  写道:
>>从代码上看,Flink
>>内部是有限速的组件的。org.apache.flink.api.common.io.ratelimiting.GuavaFlinkConnectorRateLimiter,
>>但是并没有在connector中使用。
>>
>>casel.chen  于2024年6月5日周三 14:36写道:
>>
>>> kafka本身是支持消费限流的[1],但这些消费限流参数在flink kafka sql
>>> connector中不起作用,请问这是为什么?如何才能给flink kafka source table消费限速? 谢谢!
>>>
>>>
>>> [1] https://blog.csdn.net/qq_37774171/article/details/122816246


Re:Re: 请问flink sql作业如何给kafka source table消费限速?

2024-06-05 文章 casel.chen
flink sql作业要如何配置进行限流消费呢?以防止打爆存储系统

















在 2024-06-05 14:46:23,"Alex Ching"  写道:
>从代码上看,Flink
>内部是有限速的组件的。org.apache.flink.api.common.io.ratelimiting.GuavaFlinkConnectorRateLimiter,
>但是并没有在connector中使用。
>
>casel.chen  于2024年6月5日周三 14:36写道:
>
>> kafka本身是支持消费限流的[1],但这些消费限流参数在flink kafka sql
>> connector中不起作用,请问这是为什么?如何才能给flink kafka source table消费限速? 谢谢!
>>
>>
>> [1] https://blog.csdn.net/qq_37774171/article/details/122816246


Re: 请问flink sql作业如何给kafka source table消费限速?

2024-06-05 文章 Alex Ching
从代码上看,Flink
内部是有限速的组件的。org.apache.flink.api.common.io.ratelimiting.GuavaFlinkConnectorRateLimiter,
但是并没有在connector中使用。

casel.chen  于2024年6月5日周三 14:36写道:

> kafka本身是支持消费限流的[1],但这些消费限流参数在flink kafka sql
> connector中不起作用,请问这是为什么?如何才能给flink kafka source table消费限速? 谢谢!
>
>
> [1] https://blog.csdn.net/qq_37774171/article/details/122816246


请问flink sql作业如何给kafka source table消费限速?

2024-06-05 文章 casel.chen
kafka本身是支持消费限流的[1],但这些消费限流参数在flink kafka sql connector中不起作用,请问这是为什么?如何才能给flink 
kafka source table消费限速? 谢谢!


[1] https://blog.csdn.net/qq_37774171/article/details/122816246

答复: Flink Datastream实现删除操作

2024-06-04 文章 Xiqian YU
您好,

Iceberg 为 Flink 实现的 connector 同时支持 DataStream API 和 Table API[1]。其 DataStream 
API 提供 Append(默认行为)、Overwrite、Upsert 三种可选的模式,您可以使用下面的 Java 代码片段实现:

首先创建对应数据行 Schema 格式的反序列化器,例如,可以使用 RowDataDebeziumDeserializeSchema 的生成器来快速构造一个:


private RowDataDebeziumDeserializeSchema getDeserializer(
DataType dataType) {
LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
InternalTypeInfo typeInfo = InternalTypeInfo.of(logicalType);
return RowDataDebeziumDeserializeSchema.newBuilder()
.setPhysicalRowType((RowType) dataType.getLogicalType())
.setResultTypeInfo(typeInfo)
.build();
}

然后,您可以使用该反序列化器创建 MySQL 数据源:

MySqlSource mySqlSource =
MySqlSource.builder()
// 其他参数配置略
.deserializer(getDeserializer({{ ROW_DATA_TYPE_HERE }}))
.build();

并创建一个 Iceberg 数据源:

Configuration hadoopConf = new Configuration();
TableLoader tableLoader = 
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);

FlinkSink.forRowData(mysqlSource)
.tableLoader(tableLoader)
// 此处可以追加 .overwrite(true) 或 .upsert(true)
// 来配置 Overwrite 或 Upsert 行为
.append();

P.S. 在接下来的 Flink CDC 版本中,预计会为 3.0 版本新增的 Pipeline 作业[2]提供写入 Iceberg 
的能力,使用起来更方便快捷。如果能够满足您的需求,也请多多尝试。

祝好!

Regards,
yux

[1] https://iceberg.apache.org/docs/1.5.2/flink-writes/#writing-with-datastream
[2] 
https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/get-started/introduction/



发件人: zapjone 
日期: 星期二, 2024年6月4日 18:34
收件人: user-zh@flink.apache.org 
主题: Flink Datastream实现删除操作
各位大佬好:
想请教下,在使用mysql-cdc到iceberg,通过sql方式可以实现自动更新和删除功能。但在使用datastream 
api进行处理后,注册成临时表,怎么实现类似于sql方式的自动更新和删除呢?


Flink Datastream实现删除操作

2024-06-04 文章 zapjone
各位大佬好:
想请教下,在使用mysql-cdc到iceberg,通过sql方式可以实现自动更新和删除功能。但在使用datastream 
api进行处理后,注册成临时表,怎么实现类似于sql方式的自动更新和删除呢?

Re: 【求助】关于 Flink ML 迭代中使用keyBy算子报错

2024-06-03 文章 Xiqian YU
您好!
看起来这个问题与 FLINK-35066[1] 有关,该问题描述了在 IterationBody 内实现自定义的RichCoProcessFunction 或 
CoFlatMapFunction 算子时遇到的拆包问题,可以追溯到这个[2]邮件列表中的问题报告。看起来这个问题也同样影响您使用的 
RichCoMapFunction 算子。
该问题已被此 Pull Request[3] 解决,并已合入 master 主分支。按照文档[4]尝试在本地编译 2.4-SNAPSHOT 
快照版本并执行您的代码,看起来能够正常工作。
鉴于这是一个 Flink ML 2.3 版本中的已知问题,您可以尝试在本地编译自己的快照版本,或是等待 Flink ML 2.4 的发布并更新依赖版本。
祝好!

Regards,
yux

[1] https://issues.apache.org/jira/browse/FLINK-35066
[2] https://lists.apache.org/thread/bgkw1g2tdgnp1xy1clsqtcfs3h18pkd6
[3] https://github.com/apache/flink-ml/pull/260
[4] https://github.com/apache/flink-ml#building-the-project


De : w...@shanghaitech.edu.cn 
Date : vendredi, 31 mai 2024 à 17:34
À : user-zh@flink.apache.org 
Objet : 【求助】关于 Flink ML 迭代中使用keyBy算子报错

尊敬的Flink开发者您好,

我在使用Flink ML模块的迭代功能时遇到了一个问题,当我在迭代体内使用keyBy算子时,会出现以下错误:

Caused by: java.lang.ClassCastException: 
org.apache.flink.iteration.IterationRecord cannot be cast to java.lang.String
我已经查阅文档,但还是没有头绪,所以希望能得到您的帮助,非常感谢。

我已在下方附上了最小可复现代码、报错信息以及我的运行环境信息。



以下是最小复现代码

~~~java
package myflinkml;

import org.apache.flink.iteration.DataStreamList;
import org.apache.flink.iteration.IterationBody;
import org.apache.flink.iteration.IterationBodyResult;
import org.apache.flink.iteration.Iterations;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;

public class BugDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

DataStream textStream = env.fromElements("Hello", "Flink");
DataStream intStream = env.fromElements(1, 2, 3);

Iterations.iterateUnboundedStreams(
DataStreamList.of(intStream),
DataStreamList.of(textStream),
new Body()

).get(0).print();

env.execute();
}

private static class Body implements IterationBody {

@Override
public IterationBodyResult process(DataStreamList dsl1, DataStreamList 
dsl2) {
DataStream intStream = dsl1.get(0);
DataStream textStream = dsl2.get(0);

// 迭代输出流
DataStream outStream = textStream
.connect(intStream)
.keyBy(x -> 1, x -> 1)  // 添加这行就报错!!
.map(new RichCoMapFunction() {

@Override
public String map1(String value) throws Exception {
return "Strings: " + value;
}

@Override
public String map2(Integer value) throws Exception {
return "Integer: " + value;
}
});

// 迭代反馈流
SingleOutputStreamOperator feedBackStream = 
intStream.map(x -> x - 1).filter(x -> x > 0);

return new IterationBodyResult(DataStreamList.of(feedBackStream), 
DataStreamList.of(outStream));
}
}
}

~~~

运行报错输出:

Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:267)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1300)
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at 

Re: Re:Flink SQL消费kafka topic有办法限速么?

2024-05-28 文章 Zhanghao Chen
应该是可以的。另外在老版本的 Kafka connector 上,曾经也实现过限速逻辑 [1],可以参考下。这个需求我觉得还比较通用,可以提一个 JIRA。

[1] https://issues.apache.org/jira/browse/FLINK-11501

Best,
Zhanghao Chen

From: casel.chen 
Sent: Tuesday, May 28, 2024 22:00
To: user-zh@flink.apache.org 
Subject: Re:Flink SQL消费kafka topic有办法限速么?

查了下Flink源码,当前DataGeneratorSource有添加RateLimiterStrategy参数,但KafkaSource没有该参数,可以像DataGeneratorSource那样来实现限速么?

public DataGeneratorSource(

GeneratorFunction generatorFunction,

long count,

RateLimiterStrategy rateLimiterStrategy,

TypeInformation typeInfo) {...}

















在 2024-05-27 23:47:40,"casel.chen"  写道:
>Flink SQL消费kafka topic有办法限速么?场景是消费kafka 
>topic数据写入下游mongodb,在业务高峰期时下游mongodb写入压力大,希望能够限速消费kafka,请问要如何实现?


Re:Flink SQL消费kafka topic有办法限速么?

2024-05-28 文章 casel.chen
查了下Flink源码,当前DataGeneratorSource有添加RateLimiterStrategy参数,但KafkaSource没有该参数,可以像DataGeneratorSource那样来实现限速么?

public DataGeneratorSource(

GeneratorFunction generatorFunction,

long count,

RateLimiterStrategy rateLimiterStrategy,

TypeInformation typeInfo) {...}

















在 2024-05-27 23:47:40,"casel.chen"  写道:
>Flink SQL消费kafka topic有办法限速么?场景是消费kafka 
>topic数据写入下游mongodb,在业务高峰期时下游mongodb写入压力大,希望能够限速消费kafka,请问要如何实现?


flink sqlgateway 提交sql作业如何设置组账号

2024-05-28 文章 阿华田


flink sqlgateway 
提交sql作业,发现sqlgateway服务启动后,默认是当前机器的租户信息进行任务提交到yarn集群,由于公司的hadoop集群设置了租户权限,需要设置提交的用户信息,各位大佬,flink
 sqlgateway 提交sql作业如何设置组账号
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



Flink SQL消费kafka topic有办法限速么?

2024-05-27 文章 casel.chen
Flink SQL消费kafka topic有办法限速么?场景是消费kafka 
topic数据写入下游mongodb,在业务高峰期时下游mongodb写入压力大,希望能够限速消费kafka,请问要如何实现?

Re: 关于 mongo db 的splitVector 权限问题

2024-05-23 文章 Jiabao Sun
Hi,

splitVector 是 MongoDB 计算分片的内部命令,在副本集部署模式下也可以使用此命令来计算 chunk 区间。
如果没有 splitVector 权限,会自动降级为 sample 切分策略。

Best,
Jiabao

evio12...@gmail.com  于2024年5月23日周四 16:57写道:

>
> hello~
>
>
> 我正在使用 flink-cdc mongodb connector 2.3.0
> 
>  (
> https://github.com/apache/flink-cdc/blob/release-2.3/docs/content/connectors/mongodb-cdc.md)
> ,
> 文档中指出 mongo 账号需要这些权限 'splitVector', 'listDatabases', 'listCollections',
> 'collStats', 'find', and 'changeStream' ,
>
>
> 我现在使用的mongo是 replica-set , 但是了解到 splitVector 权限主要是对分片集,
> 如果DBA不授权 splitVector , 会有什么影响呢?
>
> --
> evio12...@gmail.com
>


关于 mongo db 的splitVector 权限问题

2024-05-23 文章 evio12...@gmail.com

hello~


我正在使用 flink-cdc mongodb connector 2.3.0 
(https://github.com/apache/flink-cdc/blob/release-2.3/docs/content/connectors/mongodb-cdc.md)
  ,
文档中指出 mongo 账号需要这些权限 'splitVector', 'listDatabases', 'listCollections', 
'collStats', 'find', and 'changeStream' ,


我现在使用的mongo是 replica-set , 但是了解到 splitVector 权限主要是对分片集,
如果DBA不授权 splitVector , 会有什么影响呢?



evio12...@gmail.com


Re:咨询Flink 1.19文档中关于iterate操作

2024-05-20 文章 Xuyang
Hi, 

目前Iterate api在1.19版本上废弃了,不再支持,具体可以参考[1][2]。Flip[1]中提供了另一种替代的办法[3]




[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-357%3A+Deprecate+Iteration+API+of+DataStream

[2] https://issues.apache.org/jira/browse/FLINK-33144

[3] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=184615300




--

Best!
Xuyang





在 2024-05-20 22:39:37,""  写道:
>尊敬的Flink开发团队:
>
>您好!
>
>我目前正在学习如何使用Apache Flink的DataStream API实现迭代算法,例如图的单源最短路径。在Flink 
>1.18版本的文档中,我注意到有关于iterate操作的介绍,具体请见:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/overview/#iterations
>
>但是,我发现Flink 
>1.19版本的文档中不再提及iterate操作。这让我有些困惑。不知道在最新版本中,这是否意味着iterate操作不再被支持?如果是这样的话,请问我该如何在数据流上进行迭代计算?
>
>非常感谢您的时间和帮助,期待您的回复。
>
>谢谢!
>
>李智诚


咨询Flink 1.19文档中关于iterate操作

2024-05-20 文章 www
尊敬的Flink开发团队:

您好!

我目前正在学习如何使用Apache Flink的DataStream API实现迭代算法,例如图的单源最短路径。在Flink 
1.18版本的文档中,我注意到有关于iterate操作的介绍,具体请见:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/overview/#iterations

但是,我发现Flink 
1.19版本的文档中不再提及iterate操作。这让我有些困惑。不知道在最新版本中,这是否意味着iterate操作不再被支持?如果是这样的话,请问我该如何在数据流上进行迭代计算?

非常感谢您的时间和帮助,期待您的回复。

谢谢!

李智诚

Re: flinksql 经过优化后,group by字段少了

2024-05-20 文章 Lincoln Lee
Hi,

可以尝试下 1.17 或更新的版本, 这个问题在 flink 1.17.0 中已修复[1]。
批处理中做这个 remove 优化是符合语义的,而在流中不能直接裁剪,
对于相关时间函数的说明文档[2]中也进行了更新

[1] https://issues.apache.org/jira/browse/FLINK-30006
[2]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/#%e5%86%85%e7%bd%ae%e5%87%bd%e6%95%b0%e7%9a%84%e7%a1%ae%e5%ae%9a%e6%80%a7


Best,
Lincoln Lee


℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid> 于2024年5月20日周一 22:07写道:

> 当前用的是 flink 1.16 版本,这个issue虽然合并到了 calcite-1.22.0 中,但是在之后一段时间内,又被新的 pr (
> https://github.com/apache/calcite/pull/1735/files)合并了。
> 所以,当前flink中是仍然存在这个问题。
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> libenc...@apache.org;
> 发送时间:2024年5月20日(星期一) 中午12:51
> 收件人:"user-zh"
> 主题:Re: flinksql 经过优化后,group by字段少了
>
>
>
> 你引用的这个 calcite 的 issue[1] 是在 calcite-1.22.0 版本中就修复了的,Flink 应该从 1.11
> 版本开始就已经用的是这个 calcite 版本了。
>
> 所以你用的是哪个版本的 Flink 呢,感觉这个可能是另外一个问题。如果可以在当前最新的版本 1.19 中复现这个问题的话,可以建一个
> issue 来报一个 bug。
>
> PS:
> 上面我说的这个行为,我稍微确认下了,这个应该是一个代码生成阶段才做的区分,所以优化过程中并不能识别,所以即使是batch模式下,优化的plan也应该是包含dt字段的。
>
> [1] https://issues.apache.org/jira/browse/CALCITE-3531
>
> ℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid 于2024年5月20日周一 11:06写道:
> 
>  您好,当前是流任务。我跟了下代码,cast(CURRENT_DATE as string) 被识别了常量。这个问题已经在 calcite
> 中修复了,https://github.com/apache/calcite/pull/1602/files
>  ; 但是,flink 中引用的
> calcite 版本并没有修复这个问题。我这边通过自定义 udf 来规避了这个问题。
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
> 
> 发件人:
> "user-zh"
>   发送时间:nbsp;2024年5月20日(星期一) 上午10:32
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: flinksql 经过优化后,group by字段少了
> 
> 
> 
>  看起来像是因为 "dt = cast(CURRENT_DATEnbsp; as string)" 推导 dt
> 这个字段是个常量,进而被优化掉了。
> 
>  将 CURRENT_DATE 优化为常量的行为应该只在 batch 模式下才是这样的,你这个 SQL 是跑在 batch 模式下的嘛?
> 
>  ℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalidgt; 于2024年5月19日周日 01:01写道:
>  gt;
>  gt; create view tmp_view as
>  gt; SELECT
>  gt;nbsp;nbsp;nbsp;nbsp; dt, -- 2
>  gt;nbsp;nbsp;nbsp;nbsp; uid, -- 0
>  gt;nbsp;nbsp;nbsp;nbsp; uname, -- 1
>  gt;nbsp;nbsp;nbsp;nbsp; uage -- 3
>  gt; from
>  gt;nbsp;nbsp;nbsp;nbsp; kafkaTable
>  gt; where dt = cast(CURRENT_DATEnbsp; as string);
>  gt;
>  gt; insert into printSinkTable
>  gt; select
>  gt;nbsp;nbsp;nbsp;nbsp; dt, uid, uname,
> sum(uage)
>  gt; from tmp_view
>  gt; group by
>  gt;nbsp;nbsp;nbsp;nbsp; dt,
>  gt;nbsp;nbsp;nbsp;nbsp; uid,
>  gt;nbsp;nbsp;nbsp;nbsp; uname;
>  gt;
>  gt;
>  gt;
>  gt; sql 比较简单,首先根据 dt = current_date 条件进行过滤,然后 按照dt、uid、uname
> 三个字段进行聚合求和操作。
>  gt; 但是,经过优化后,生成的 物理结构如下:
>  gt; == Optimized Execution Plan ==
>  gt;
> Sink(table=[default_catalog.default_database.printSinkTable], fields=[dt,
> uid, uname, EXPR$3])
>  gt; +- Calc(select=[CAST(CAST(CURRENT_DATE())) AS dt, uid,
> uname, EXPR$3])
>  gt; amp;nbsp; amp;nbsp;+- GroupAggregate(groupBy=[uid,
> uname], select=[uid, uname, SUM(uage) AS EXPR$3])
>  gt; amp;nbsp; amp;nbsp; amp;nbsp; +-
> Exchange(distribution=[hash[uid, uname]])
>  gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp;
> amp;nbsp;+- Calc(select=[uid, uname, uage], where=[(dt =
> CAST(CURRENT_DATE()))])
>  gt; amp;nbsp; amp;nbsp; amp;nbsp; amp;nbsp;
> amp;nbsp; amp;nbsp; +- TableSourceScan(table=[[default_catalog,
> default_database, kafkaTable]], fields=[uid, uname, dt, uage])
>  gt;
>  gt;
>  gt;
>  gt; 请问,这个时候,怎么实现按照 dt\uid\uname 三个字段聚合求和。感谢
> 
> 
> 
>  --
> 
>  Best,
>  Benchao Li
>
>
>
> --
>
> Best,
> Benchao Li


Re: flinksql 经过优化后,group by字段少了

2024-05-19 文章 Benchao Li
你引用的这个 calcite 的 issue[1] 是在 calcite-1.22.0 版本中就修复了的,Flink 应该从 1.11
版本开始就已经用的是这个 calcite 版本了。

所以你用的是哪个版本的 Flink 呢,感觉这个可能是另外一个问题。如果可以在当前最新的版本 1.19 中复现这个问题的话,可以建一个
issue 来报一个 bug。

PS: 
上面我说的这个行为,我稍微确认下了,这个应该是一个代码生成阶段才做的区分,所以优化过程中并不能识别,所以即使是batch模式下,优化的plan也应该是包含dt字段的。

[1] https://issues.apache.org/jira/browse/CALCITE-3531

℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid> 于2024年5月20日周一 11:06写道:
>
> 您好,当前是流任务。我跟了下代码,cast(CURRENT_DATE as string) 被识别了常量。这个问题已经在 calcite 
> 中修复了,https://github.com/apache/calcite/pull/1602/files
> 但是,flink 中引用的 calcite 版本并没有修复这个问题。我这边通过自定义 udf 来规避了这个问题。
>
>
>
>
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2024年5月20日(星期一) 上午10:32
> 收件人:"user-zh"
> 主题:Re: flinksql 经过优化后,group by字段少了
>
>
>
> 看起来像是因为 "dt = cast(CURRENT_DATE as string)" 推导 dt 这个字段是个常量,进而被优化掉了。
>
> 将 CURRENT_DATE 优化为常量的行为应该只在 batch 模式下才是这样的,你这个 SQL 是跑在 batch 模式下的嘛?
>
> ℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid 于2024年5月19日周日 01:01写道:
> 
>  create view tmp_view as
>  SELECT
>  dt, -- 2
>  uid, -- 0
>  uname, -- 1
>  uage -- 3
>  from
>  kafkaTable
>  where dt = cast(CURRENT_DATE as string);
> 
>  insert into printSinkTable
>  select
>  dt, uid, uname, sum(uage)
>  from tmp_view
>  group by
>  dt,
>  uid,
>  uname;
> 
> 
> 
>  sql 比较简单,首先根据 dt = current_date 条件进行过滤,然后 按照dt、uid、uname 三个字段进行聚合求和操作。
>  但是,经过优化后,生成的 物理结构如下:
>  == Optimized Execution Plan ==
>  Sink(table=[default_catalog.default_database.printSinkTable], 
> fields=[dt, uid, uname, EXPR$3])
>  +- Calc(select=[CAST(CAST(CURRENT_DATE())) AS dt, uid, uname, EXPR$3])
>  nbsp; nbsp;+- GroupAggregate(groupBy=[uid, uname], 
> select=[uid, uname, SUM(uage) AS EXPR$3])
>  nbsp; nbsp; nbsp; +- Exchange(distribution=[hash[uid, 
> uname]])
>  nbsp; nbsp; nbsp; nbsp; nbsp;+- 
> Calc(select=[uid, uname, uage], where=[(dt = CAST(CURRENT_DATE()))])
>  nbsp; nbsp; nbsp; nbsp; nbsp; nbsp; +- 
> TableSourceScan(table=[[default_catalog, default_database, kafkaTable]], 
> fields=[uid, uname, dt, uage])
> 
> 
> 
>  请问,这个时候,怎么实现按照 dt\uid\uname 三个字段聚合求和。感谢
>
>
>
> --
>
> Best,
> Benchao Li



-- 

Best,
Benchao Li


Re: flinksql 经过优化后,group by字段少了

2024-05-19 文章 Benchao Li
看起来像是因为 "dt = cast(CURRENT_DATE  as string)" 推导 dt 这个字段是个常量,进而被优化掉了。

将 CURRENT_DATE 优化为常量的行为应该只在 batch 模式下才是这样的,你这个 SQL 是跑在 batch 模式下的嘛?

℡小新的蜡笔不见嘞、 <1515827...@qq.com.invalid> 于2024年5月19日周日 01:01写道:
>
> create view tmp_view as
> SELECT
> dt, -- 2
> uid, -- 0
> uname, -- 1
> uage -- 3
> from
> kafkaTable
> where dt = cast(CURRENT_DATE  as string);
>
> insert into printSinkTable
> select
> dt, uid, uname, sum(uage)
> from tmp_view
> group by
> dt,
> uid,
> uname;
>
>
>
> sql 比较简单,首先根据 dt = current_date 条件进行过滤,然后 按照dt、uid、uname 三个字段进行聚合求和操作。
> 但是,经过优化后,生成的 物理结构如下:
> == Optimized Execution Plan ==
> Sink(table=[default_catalog.default_database.printSinkTable], fields=[dt, 
> uid, uname, EXPR$3])
> +- Calc(select=[CAST(CAST(CURRENT_DATE())) AS dt, uid, uname, EXPR$3])
>  +- GroupAggregate(groupBy=[uid, uname], select=[uid, uname, 
> SUM(uage) AS EXPR$3])
>+- Exchange(distribution=[hash[uid, uname]])
> +- Calc(select=[uid, uname, uage], 
> where=[(dt = CAST(CURRENT_DATE()))])
>   +- 
> TableSourceScan(table=[[default_catalog, default_database, kafkaTable]], 
> fields=[uid, uname, dt, uage])
>
>
>
> 请问,这个时候,怎么实现按照 dt\uid\uname 三个字段聚合求和。感谢



-- 

Best,
Benchao Li


Re: Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-19 文章 Jingsong Li
CC to the Paimon community.

Best,
Jingsong

On Mon, May 20, 2024 at 9:55 AM Jingsong Li  wrote:
>
> Amazing, congrats!
>
> Best,
> Jingsong
>
> On Sat, May 18, 2024 at 3:10 PM 大卫415 <2446566...@qq.com.invalid> wrote:
> >
> > 退订
> >
> >
> >
> >
> >
> >
> >
> > Original Email
> >
> >
> >
> > Sender:"gongzhongqiang"< gongzhongqi...@apache.org ;
> >
> > Sent Time:2024/5/17 23:10
> >
> > To:"Qingsheng Ren"< re...@apache.org ;
> >
> > Cc recipient:"dev"< d...@flink.apache.org ;"user"< 
> > u...@flink.apache.org ;"user-zh"< user-zh@flink.apache.org ;"Apache 
> > Announce List"< annou...@apache.org ;
> >
> > Subject:Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released
> >
> >
> > Congratulations !
> > Thanks for all contributors.
> >
> >
> > Best,
> >
> > Zhongqiang Gong
> >
> > Qingsheng Ren  于 2024年5月17日周五 17:33写道:
> >
> >  The Apache Flink community is very happy to announce the release of
> >  Apache Flink CDC 3.1.0.
> > 
> >  Apache Flink CDC is a distributed data integration tool for real time
> >  data and batch data, bringing the simplicity and elegance of data
> >  integration via YAML to describe the data movement and transformation
> >  in a data pipeline.
> > 
> >  Please check out the release blog post for an overview of the release:
> > 
> >  
> > https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/
> > 
> >  The release is available for download at:
> >  https://flink.apache.org/downloads.html
> > 
> >  Maven artifacts for Flink CDC can be found at:
> >  https://search.maven.org/search?q=g:org.apache.flink%20cdc
> > 
> >  The full release notes are available in Jira:
> > 
> >  
> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522version=12354387
> > 
> >  We would like to thank all contributors of the Apache Flink community
> >  who made this release possible!
> > 
> >  Regards,
> >  Qingsheng Ren
> > 


Re: Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-19 文章 Jingsong Li
Amazing, congrats!

Best,
Jingsong

On Sat, May 18, 2024 at 3:10 PM 大卫415 <2446566...@qq.com.invalid> wrote:
>
> 退订
>
>
>
>
>
>
>
> Original Email
>
>
>
> Sender:"gongzhongqiang"< gongzhongqi...@apache.org ;
>
> Sent Time:2024/5/17 23:10
>
> To:"Qingsheng Ren"< re...@apache.org ;
>
> Cc recipient:"dev"< d...@flink.apache.org ;"user"< u...@flink.apache.org 
> ;"user-zh"< user-zh@flink.apache.org ;"Apache Announce List"< 
> annou...@apache.org ;
>
> Subject:Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released
>
>
> Congratulations !
> Thanks for all contributors.
>
>
> Best,
>
> Zhongqiang Gong
>
> Qingsheng Ren  于 2024年5月17日周五 17:33写道:
>
>  The Apache Flink community is very happy to announce the release of
>  Apache Flink CDC 3.1.0.
> 
>  Apache Flink CDC is a distributed data integration tool for real time
>  data and batch data, bringing the simplicity and elegance of data
>  integration via YAML to describe the data movement and transformation
>  in a data pipeline.
> 
>  Please check out the release blog post for an overview of the release:
> 
>  
> https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/
> 
>  The release is available for download at:
>  https://flink.apache.org/downloads.html
> 
>  Maven artifacts for Flink CDC can be found at:
>  https://search.maven.org/search?q=g:org.apache.flink%20cdc
> 
>  The full release notes are available in Jira:
> 
>  
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522version=12354387
> 
>  We would like to thank all contributors of the Apache Flink community
>  who made this release possible!
> 
>  Regards,
>  Qingsheng Ren
> 


Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-17 文章 gongzhongqiang
Congratulations !
Thanks for all contributors.


Best,

Zhongqiang Gong

Qingsheng Ren  于 2024年5月17日周五 17:33写道:

> The Apache Flink community is very happy to announce the release of
> Apache Flink CDC 3.1.0.
>
> Apache Flink CDC is a distributed data integration tool for real time
> data and batch data, bringing the simplicity and elegance of data
> integration via YAML to describe the data movement and transformation
> in a data pipeline.
>
> Please check out the release blog post for an overview of the release:
>
> https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Flink CDC can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20cdc
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354387
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
> Regards,
> Qingsheng Ren
>


Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-17 文章 Hang Ruan
Congratulations!

Thanks for the great work.

Best,
Hang

Qingsheng Ren  于2024年5月17日周五 17:33写道:

> The Apache Flink community is very happy to announce the release of
> Apache Flink CDC 3.1.0.
>
> Apache Flink CDC is a distributed data integration tool for real time
> data and batch data, bringing the simplicity and elegance of data
> integration via YAML to describe the data movement and transformation
> in a data pipeline.
>
> Please check out the release blog post for an overview of the release:
>
> https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Flink CDC can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20cdc
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354387
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
> Regards,
> Qingsheng Ren
>


Re: [ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-17 文章 Leonard Xu
Congratulations !

Thanks Qingsheng for the great work and all contributors involved !!

Best,
Leonard


> 2024年5月17日 下午5:32,Qingsheng Ren  写道:
> 
> The Apache Flink community is very happy to announce the release of
> Apache Flink CDC 3.1.0.
> 
> Apache Flink CDC is a distributed data integration tool for real time
> data and batch data, bringing the simplicity and elegance of data
> integration via YAML to describe the data movement and transformation
> in a data pipeline.
> 
> Please check out the release blog post for an overview of the release:
> https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/
> 
> The release is available for download at:
> https://flink.apache.org/downloads.html
> 
> Maven artifacts for Flink CDC can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20cdc
> 
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354387
> 
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
> 
> Regards,
> Qingsheng Ren



[ANNOUNCE] Apache Flink CDC 3.1.0 released

2024-05-17 文章 Qingsheng Ren
The Apache Flink community is very happy to announce the release of
Apache Flink CDC 3.1.0.

Apache Flink CDC is a distributed data integration tool for real time
data and batch data, bringing the simplicity and elegance of data
integration via YAML to describe the data movement and transformation
in a data pipeline.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/2024/05/17/apache-flink-cdc-3.1.0-release-announcement/

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink CDC can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20cdc

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12354387

We would like to thank all contributors of the Apache Flink community
who made this release possible!

Regards,
Qingsheng Ren


Re: Flink 1.18.1 ,重启状态恢复

2024-05-16 文章 Yanfei Lei
看起来和 FLINK-34063 / FLINK-33863 是同样的问题,您可以升级到1.18.2 试试看。
[1] https://issues.apache.org/jira/browse/FLINK-33863
[2] https://issues.apache.org/jira/browse/FLINK-34063

陈叶超  于2024年5月16日周四 16:38写道:
>
> 升级到 flink 1.18.1 ,任务重启状态恢复的话,遇到如下报错:
> 2024-04-09 13:03:48
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:258)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:256)
> at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
> state backend for 
> RowDataStoreWriteOperator_8d96fc510e75de3baf03ef7367db7d42_(2/2) from any of 
> the 1 provided restore options.
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:289)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:176)
> ... 11 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed 
> when trying to restore operator state backend
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:88)
> at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:533)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:380)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:280)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ... 13 more
> Caused by: java.io.IOException: invalid stream header
> at 
> org.xerial.snappy.SnappyFramedInputStream.(SnappyFramedInputStream.java:235)
> at 
> org.xerial.snappy.SnappyFramedInputStream.(SnappyFramedInputStream.java:145)
> at 
> org.xerial.snappy.SnappyFramedInputStream.(SnappyFramedInputStream.java:129)
> at 
> org.apache.flink.runtime.state.SnappyStreamCompressionDecorator.decorateWithCompression(SnappyStreamCompressionDecorator.java:53)
> at 
> org.apache.flink.runtime.state.StreamCompressionDecorator.decorateWithCompression(StreamCompressionDecorator.java:60)
> at 
> org.apache.flink.runtime.state.CompressibleFSDataInputStream.(CompressibleFSDataInputStream.java:39)
> at 
> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:185)
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:85)
> ... 18 more
>


-- 
Best,
Yanfei


Get access to unmatching events in Apache Flink Cep

2024-05-16 文章 Anton Sidorov
Hello!

I have a Flink Job with CEP pattern.

Pattern example:

// Strict Contiguity
// a b+ c d e
Pattern.begin("a", AfterMatchSkipStrategy.skipPastLastEvent()).where(...)
.next("b").where(...).oneOrMore()
.next("c").where(...)
.next("d").where(...)
.next("e").where(...);

I have events with wrong order stream on input:

a b d c e

On output I haven`t any matching. But I want have access to events, that
not matching.

Can I have access to middle NFA state in CEP pattern, or get some other way
to view unmatching events?

Example project with CEP pattern on github
, and my question
on SO


Thanks in advance


Flink 1.18.1 ,重启状态恢复

2024-05-16 文章 陈叶超
升级到 flink 1.18.1 ,任务重启状态恢复的话,遇到如下报错:
2024-04-09 13:03:48
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:258)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:256)
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for 
RowDataStoreWriteOperator_8d96fc510e75de3baf03ef7367db7d42_(2/2) from any of 
the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:289)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:176)
... 11 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore operator state backend
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:88)
at 
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:533)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:380)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:280)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 13 more
Caused by: java.io.IOException: invalid stream header
at 
org.xerial.snappy.SnappyFramedInputStream.(SnappyFramedInputStream.java:235)
at 
org.xerial.snappy.SnappyFramedInputStream.(SnappyFramedInputStream.java:145)
at 
org.xerial.snappy.SnappyFramedInputStream.(SnappyFramedInputStream.java:129)
at 
org.apache.flink.runtime.state.SnappyStreamCompressionDecorator.decorateWithCompression(SnappyStreamCompressionDecorator.java:53)
at 
org.apache.flink.runtime.state.StreamCompressionDecorator.decorateWithCompression(StreamCompressionDecorator.java:60)
at 
org.apache.flink.runtime.state.CompressibleFSDataInputStream.(CompressibleFSDataInputStream.java:39)
at 
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:185)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:85)
... 18 more



Re:Re: use flink 1.19 JDBC Driver can find jdbc connector

2024-05-15 文章 Xuyang
Hi, 

> 现在可以用中文了?

我看你发的是中文答疑邮箱




> 就是opt目录里面的gateway.jar直接编辑Factory文件把connector注册就行了

你的意思是,之前报错类似"找不到一个jdbc 
connector",然后直接在gateway的jar包里的META-INF/services内的Factory文件(SPI文件)内加入jdbc 
connector的Factory实现类就好了吗?




如果是这个问题就有点奇怪,因为本身flink-connector-jdbc的spi文件就已经将相关的类写进去了[1],按理说放到lib目录下,就会spi发现的




[1] 
https://github.com/apache/flink-connector-jdbc/blob/bde28e6a92ffa75ae45bc8df6be55d299ff995a2/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory#L16




--

Best!
Xuyang





在 2024-05-15 15:51:49,abc15...@163.com 写道:
>现在可以用中文了?就是opt目录里面的gateway.jar直接编辑Factory文件把connector注册就行了
>
>
>> 在 2024年5月15日,15:36,Xuyang  写道:
>> 
>> Hi, 看起来你之前的问题是jdbc driver找不到,可以简单描述下你的解决的方法吗?“注册connection数的数量”有点不太好理解。
>> 
>> 
>> 
>> 
>> 如果确实有类似的问题、并且通过这种手段解决了的话,可以建一个improvement的jira issue[1]来帮助社区跟踪、改善这个问题,感谢!
>> 
>> 
>> 
>> 
>> [1] https://issues.apache.org/jira/projects/FLINK/summary
>> 
>> 
>> 
>> 
>> --
>> 
>>Best!
>>Xuyang
>> 
>> 
>> 
>> 
>> 
>>> 在 2024-05-10 12:26:22,abc15...@163.com 写道:
>>> I've solved it. You need to register the number of connections in the jar 
>>> of gateway. But this is inconvenient, and I still hope to improve it.
>>> 发自我的 iPhone
>>> 
> 在 2024年5月10日,11:56,Xuyang  写道:
 
 Hi, can you print the classloader and verify if the jdbc connector exists 
 in it?
 
 
 
 
 --
 
   Best!
   Xuyang
 
 
 
 
 
 At 2024-05-09 17:48:33, "McClone"  wrote:
> I put flink-connector-jdbc into flink\lib.use flink 1.19 JDBC Driver can 
> not  find jdbc connector,but use sql-client is normal.


Re: use flink 1.19 JDBC Driver can find jdbc connector

2024-05-15 文章 abc15606
现在可以用中文了?就是opt目录里面的gateway.jar直接编辑Factory文件把connector注册就行了


> 在 2024年5月15日,15:36,Xuyang  写道:
> 
> Hi, 看起来你之前的问题是jdbc driver找不到,可以简单描述下你的解决的方法吗?“注册connection数的数量”有点不太好理解。
> 
> 
> 
> 
> 如果确实有类似的问题、并且通过这种手段解决了的话,可以建一个improvement的jira issue[1]来帮助社区跟踪、改善这个问题,感谢!
> 
> 
> 
> 
> [1] https://issues.apache.org/jira/projects/FLINK/summary
> 
> 
> 
> 
> --
> 
>Best!
>Xuyang
> 
> 
> 
> 
> 
>> 在 2024-05-10 12:26:22,abc15...@163.com 写道:
>> I've solved it. You need to register the number of connections in the jar of 
>> gateway. But this is inconvenient, and I still hope to improve it.
>> 发自我的 iPhone
>> 
 在 2024年5月10日,11:56,Xuyang  写道:
>>> 
>>> Hi, can you print the classloader and verify if the jdbc connector exists 
>>> in it?
>>> 
>>> 
>>> 
>>> 
>>> --
>>> 
>>>   Best!
>>>   Xuyang
>>> 
>>> 
>>> 
>>> 
>>> 
>>> At 2024-05-09 17:48:33, "McClone"  wrote:
 I put flink-connector-jdbc into flink\lib.use flink 1.19 JDBC Driver can 
 not  find jdbc connector,but use sql-client is normal.



Re:请问如何贡献Flink Hologres连接器?

2024-05-15 文章 Xuyang
Hi, 

我觉得如果只是从贡献的角度来说,支持flink hologres 
connector是没问题的,hologres目前作为比较热门的数据库,肯定是有很多需求的,并且现在aliyun 
github官方也基于此提供了开源的flink hologres connector[1]。





但是涉及到aliyun等公司商业化的ververica-connector-hologres包,如果想直接开源的话,在我的角度最好事先确认下面几点,不然可能会隐含一些法律风险

  1. jar包的提供方(aliyun等公司)是否知情、且愿意开源,不然直接拿着商业化的东西给出来有点不太好

2. jar包内的协议是否满足开源的协议,而不是商业化的协议




我推荐如果真要开源,可以基于开源github仓库的flink hologres connector[1]来贡献(比如现在我看目前它最高支持flink 
1.17,可以试试贡献支持到1.18、1.19等等)




[1] https://github.com/aliyun/alibabacloud-hologres-connectors




--

Best!
Xuyang





在 2024-05-14 11:24:37,"casel.chen"  写道:
>我们有使用阿里云商业版Hologres数据库,同时我们有自研的Flink实时计算平台,为了实现在Hologres上实时建仓,我们基于开源Apache 
>Flink 1.17.1结合阿里云maven仓库的ververica-connector-hologres包[1]和开源的holo 
>client[2]开发了hologres 
>connector,修复了一些jar依赖问题。目前我们已经在生产环境使用了一段时间,暂时没有发现问题,现在想将它贡献给社区。
>
>
>请问:
>1. 贡献Flink Hologres连接器是否合规?
>2. 如果合规的话,PR应该提到哪个项目代码仓库?
>3. 还是说要像 https://flink-packages.org/categories/connectors 
>这样链接到自己的github仓库?如果是的话要怎么在flink-packages.org上面注册呢?
>
>
>[1] 
>https://repo1.maven.org/maven2/com/alibaba/ververica/ververica-connector-hologres/1.17-vvr-8.0.4-1/
>[2] 
>https://github.com/aliyun/alibabacloud-hologres-connectors/tree/master/holo-client


Re:Re: use flink 1.19 JDBC Driver can find jdbc connector

2024-05-15 文章 Xuyang
Hi, 看起来你之前的问题是jdbc driver找不到,可以简单描述下你的解决的方法吗?“注册connection数的数量”有点不太好理解。




如果确实有类似的问题、并且通过这种手段解决了的话,可以建一个improvement的jira issue[1]来帮助社区跟踪、改善这个问题,感谢!




[1] https://issues.apache.org/jira/projects/FLINK/summary




--

Best!
Xuyang





在 2024-05-10 12:26:22,abc15...@163.com 写道:
>I've solved it. You need to register the number of connections in the jar of 
>gateway. But this is inconvenient, and I still hope to improve it.
>发自我的 iPhone
>
>> 在 2024年5月10日,11:56,Xuyang  写道:
>> 
>> Hi, can you print the classloader and verify if the jdbc connector exists 
>> in it?
>> 
>> 
>> 
>> 
>> --
>> 
>>Best!
>>Xuyang
>> 
>> 
>> 
>> 
>> 
>> At 2024-05-09 17:48:33, "McClone"  wrote:
>>> I put flink-connector-jdbc into flink\lib.use flink 1.19 JDBC Driver can 
>>> not  find jdbc connector,but use sql-client is normal.


请问如何贡献Flink Hologres连接器?

2024-05-13 文章 casel.chen
我们有使用阿里云商业版Hologres数据库,同时我们有自研的Flink实时计算平台,为了实现在Hologres上实时建仓,我们基于开源Apache 
Flink 1.17.1结合阿里云maven仓库的ververica-connector-hologres包[1]和开源的holo 
client[2]开发了hologres 
connector,修复了一些jar依赖问题。目前我们已经在生产环境使用了一段时间,暂时没有发现问题,现在想将它贡献给社区。


请问:
1. 贡献Flink Hologres连接器是否合规?
2. 如果合规的话,PR应该提到哪个项目代码仓库?
3. 还是说要像 https://flink-packages.org/categories/connectors 
这样链接到自己的github仓库?如果是的话要怎么在flink-packages.org上面注册呢?


[1] 
https://repo1.maven.org/maven2/com/alibaba/ververica/ververica-connector-hologres/1.17-vvr-8.0.4-1/
[2] 
https://github.com/aliyun/alibabacloud-hologres-connectors/tree/master/holo-client

Re: 使用Kafka记录自身的时间戳问题

2024-05-13 文章 Biao Geng
Hi,

>>> 那这个时间戳是kafka接收到数据自动生成的时间吗?还是说消息发送给kafka的时候需要怎么设置把业务时间附上去?
这个时间戳来自Kafka record里的时间戳,可以参考代码

。它的生成逻辑由Kafka配置决定,如果用户没有指定的话,默认是消息创建时间,可以参考Kafka的文档


>>> 感觉应该是发送数据到kafka的时候需要把业务时间给附上去,那在sink端怎么把时间附上去呢?
flink的Kafka connector里KafkaSink的实现是默认用input record里的时间戳,可以参考这里的实现

。


Best,
Biao Geng


ha.fen...@aisino.com  于2024年5月8日周三 10:59写道:

>
> DataStream stream = env.fromSource(
> kafkaSource,
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)),
> "mySource");
> 这样做使用的是Kafka记录自身的时间戳来定义watermark。
>
> 那这个时间戳是kafka接收到数据自动生成的时间吗?还是说消息发送给kafka的时候需要怎么设置把业务时间附上去?
> 感觉应该是发送数据到kafka的时候需要把业务时间给附上去,那在sink端怎么把时间附上去呢?
>


Re: use flink 1.19 JDBC Driver can find jdbc connector

2024-05-13 文章 kellygeorg...@163.com
退订



 Replied Message 
| From | abc15...@163.com |
| Date | 05/10/2024 12:26 |
| To | user-zh@flink.apache.org |
| Cc | |
| Subject | Re: use flink 1.19 JDBC Driver can find jdbc connector |
I've solved it. You need to register the number of connections in the jar of 
gateway. But this is inconvenient, and I still hope to improve it.
发自我的 iPhone

> 在 2024年5月10日,11:56,Xuyang  写道:
>
> Hi, can you print the classloader and verify if the jdbc connector exists in 
> it?
>
>
>
>
> --
>
>Best!
>Xuyang
>
>
>
>
>
> At 2024-05-09 17:48:33, "McClone"  wrote:
>> I put flink-connector-jdbc into flink\lib.use flink 1.19 JDBC Driver can not 
>>  find jdbc connector,but use sql-client is normal.


Re: 退订

2024-05-11 文章 Hang Ruan
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅来自
 user-zh@flink.apache.org 
邮件组的邮件,你可以参考[1][2]
管理你的邮件订阅。

Best,
Hang

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

爱看书不识字  于2024年5月11日周六 10:06写道:

> 退订


Re: use flink 1.19 JDBC Driver can find jdbc connector

2024-05-09 文章 abc15606
I've solved it. You need to register the number of connections in the jar of 
gateway. But this is inconvenient, and I still hope to improve it.
发自我的 iPhone

> 在 2024年5月10日,11:56,Xuyang  写道:
> 
> Hi, can you print the classloader and verify if the jdbc connector exists in 
> it?
> 
> 
> 
> 
> --
> 
>Best!
>Xuyang
> 
> 
> 
> 
> 
> At 2024-05-09 17:48:33, "McClone"  wrote:
>> I put flink-connector-jdbc into flink\lib.use flink 1.19 JDBC Driver can not 
>>  find jdbc connector,but use sql-client is normal.



Re:use flink 1.19 JDBC Driver can find jdbc connector

2024-05-09 文章 Xuyang
Hi, can you print the classloader and verify if the jdbc connector exists in it?




--

Best!
Xuyang





At 2024-05-09 17:48:33, "McClone"  wrote:
>I put flink-connector-jdbc into flink\lib.use flink 1.19 JDBC Driver can not  
>find jdbc connector,but use sql-client is normal.


请问有没有公司可以提供开源Flink维保服务?

2024-05-09 文章 LIU Xiao
如题


Re: 退订

2024-05-09 文章 Yunfeng Zhou
Hi,

退订请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org
.

Best,
yunfeng

On Thu, May 9, 2024 at 5:58 PM xpfei0811  wrote:
>
> 退订
>
>  回复的原邮件 
> | 发件人 | wangfengyang |
> | 发送日期 | 2024年04月23日 18:10 |
> | 收件人 | user-zh  |
> | 主题 | 退订 |
> 退订


??????????

2024-05-09 文章 xpfei0811


  
| ?? | wangfengyang |
|  | 2024??04??23?? 18:10 |
| ?? | user-zh  |
|  |  |


回复:退订

2024-05-09 文章 xpfei0811
退订

 回复的原邮件 
| 发件人 | jh...@163.com |
| 发送日期 | 2024年04月20日 22:01 |
| 收件人 | user-zh  |
| 主题 | Re: 退订 |


| |
jhg22
|
|
jh...@163.com
|


jh...@163.com

发件人: 冮雪程
发送时间: 2024-04-19 18:01
收件人: user-zh@flink.apache.org
主题: 退订




| |
冮雪程
|
|
gxc_bigd...@163.com
|


 回复的原邮件 
| 发件人 | jh...@163.com |
| 发送日期 | 2024年04月18日 16:17 |
| 收件人 | user-zh |
| 主题 | Re: 回复:退订 |
退订



jh...@163.com

发件人: 我的意中人是个盖世英雄
发送时间: 2024-04-18 16:03
收件人: user-zh
主题: 回复:退订
退订



---原始邮件---
发件人: "willluzheng"

use flink 1.19 JDBC Driver can find jdbc connector

2024-05-09 文章 McClone
I put flink-connector-jdbc into flink\lib.use flink 1.19 JDBC Driver can not  
find jdbc connector,but use sql-client is normal.

Re: Re: java.io.IOException: Could not load the native RocksDB library

2024-05-06 文章 Yanfei Lei
或许您可以尝试参考下[1] 再验证下加载的问题。

BTW,目前看起来是有些依赖库找不到,librocksdbjni-win64.dll 当时是基于 VS2022
编译出来的,您也尝试下在本地安装下VS2022后重试。

[1] https://github.com/facebook/rocksdb/issues/2531#issuecomment-313209314

ha.fen...@aisino.com  于2024年5月7日周二 10:22写道:
>
> idea工具,win10操作系统
> java.io.IOException: Could not load the native RocksDB library
> at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(EmbeddedRocksDBStateBackend.java:940)
>  ~[flink-statebackend-rocksdb-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(EmbeddedRocksDBStateBackend.java:870)
>  ~[flink-statebackend-rocksdb-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:400)
>  ~[flink-statebackend-rocksdb-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90)
>  ~[flink-statebackend-rocksdb-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:393)
>  ~[flink-streaming-java-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
>  ~[flink-streaming-java-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>  ~[flink-streaming-java-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:399)
>  ~[flink-streaming-java-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:180)
>  ~[flink-streaming-java-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:266)
>  ~[flink-streaming-java-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>  ~[flink-streaming-java-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:799)
>  ~[flink-streaming-java-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:753)
>  ~[flink-streaming-java-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>  ~[flink-streaming-java-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:753)
>  ~[flink-streaming-java-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:712)
>  ~[flink-streaming-java-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>  [flink-runtime-1.19.0.jar:1.19.0]
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
> [flink-runtime-1.19.0.jar:1.19.0]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) 
> [flink-runtime-1.19.0.jar:1.19.0]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
> [flink-runtime-1.19.0.jar:1.19.0]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
> Caused by: java.lang.UnsatisfiedLinkError: 
> C:\Users\Administrator\AppData\Local\Temp\minicluster_3997ce9addcd45323f4b8d2891c63181\tm_0\tmp\rocksdb-lib-b92bf66b523726cc074235a82f4c40f1\librocksdbjni-win64.dll:
>  Can't find dependent libraries
> at java.lang.ClassLoader$NativeLibrary.load(Native Method) ~[?:1.8.0_60]
> at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1938) ~[?:1.8.0_60]
> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1821) ~[?:1.8.0_60]
> at java.lang.Runtime.load0(Runtime.java:809) ~[?:1.8.0_60]
> at java.lang.System.load(System.java:1086) ~[?:1.8.0_60]
> at 
> org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:102)
>  ~[frocksdbjni-6.20.3-ververica-2.0.jar:?]
> at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:82) 
> ~[frocksdbjni-6.20.3-ververica-2.0.jar:?]
> at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(EmbeddedRocksDBStateBackend.java:914)
>  ~[flink-statebackend-rocksdb-1.19.0.jar:1.19.0]
> ... 20 more
> 10:18:44,556 WARN  org.apache.flink.runtime.taskmanager.Task  
>   [] - TumblingEventTimeWindows -> Sink: Print to Std. Out (2/4)#0 
> (2500c455c9c458780199da504300da05_90bea66de1c231edf33913ecd54406c1_1_0) 
> switched from INITIALIZING to FAILED with failure cause:
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> 

Re: java.io.IOException: Could not load the native RocksDB library

2024-05-06 文章 Yanfei Lei
请问是什么开发环境呢? windows吗?
可以分享一下更详细的报错吗?比如.dll 找不到

ha.fen...@aisino.com  于2024年5月7日周二 09:34写道:
>
> Configuration config = new Configuration();
> config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
> config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
> config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file:\\d:\\cdc");
>
> 开发环境Flink1.17包中运行没有问题
> 开发环境Flink1.19包中运行提示
>
> java.io.IOException: Could not load the native RocksDB library



-- 
Best,
Yanfei


Re: Flink sql retract to append

2024-04-30 文章 Zijun Zhao
以处理时间为升序,处理结果肯定不会出现回撤的,因为往后的时间不会比当前时间小了,你可以在试试这个去重

On Tue, Apr 30, 2024 at 3:35 PM 焦童  wrote:

> 谢谢你的建议  但是top-1也会产生回撤信息
>
> > 2024年4月30日 15:27,ha.fen...@aisino.com 写道:
> >
> > 可以参考这个
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/table/sql/queries/deduplication/
> > 1.11版本不知道是不是支持
> >
> > From: 焦童
> > Date: 2024-04-30 11:25
> > To: user-zh
> > Subject: Flink sql retract to append
> > Hello ,
> > 我使用Flink 1.11 版本 sql  进行数据去重(通过 group by
> 形式)但是这会产生回撤流,下游存储不支持回撤流信息仅支持append,在DataStream
> 中我可以通过状态进行去重,但是在sql中如何做到去重且不产生回撤流呢。谢谢各位
>
>


Re:Re: 在idea中用CliFrontend提交job 报错 java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer;

2024-04-30 文章 z_mmG
您好,根据您的指点,配置之后,重启了StandaloneSessionClusterEntrypoint和TaskManagerrunner,这个问题解决了,谢谢您

















在 2024-04-30 15:45:18,"Biao Geng"  写道:
>Hi,
>
>这个报错一般是JDK版本不一致导致的。建议统一build flink和执行flink作业时的Java版本,(都用JDK8 或者 都用JDK11)。
>用JDK11时没有sun.misc的问题可以试试勾选掉Idea的Settings-> Build, Execution and Deployment
>-> Compiler-> Java Compiler的Use '--release' option for cross-compilation'
>选项。
>
>Best,
>Biao Geng
>
>
>z_mmG <13520871...@163.com> 于2024年4月30日周二 15:08写道:
>
>>
>> JDK11 编译的flink1.19的源码
>> 因为他说没有sun.misc,所以启动用的jdk8
>>
>> 已连接到地址为 ''127.0.0.1:8339',传输: '套接字'' 的目标虚拟机
>>
>> Job has been submitted with JobID 0975ec264edfd11d236dd190e7708d70
>>
>>
>> 
>>
>>  The program finished with the following exception:
>>
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error:
>> org.apache.flink.client.program.ProgramInvocationException: Job failed
>> (JobID: 0975ec264edfd11d236dd190e7708d70)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:373)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
>>
>> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
>>
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
>>
>> at
>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
>>
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
>>
>> Caused by: java.util.concurrent.ExecutionException:
>> org.apache.flink.client.program.ProgramInvocationException: Job failed
>> (JobID: 0975ec264edfd11d236dd190e7708d70)
>>
>> at
>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>>
>> at
>> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:170)
>>
>> at
>> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:121)
>>
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2325)
>>
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2303)
>>
>> at org.apache.flink.streaming.examples.ys.WordCount.main(WordCount.java:34)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:498)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356)
>>
>> ... 9 more
>>
>> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
>> failed (JobID: 0975ec264edfd11d236dd190e7708d70)
>>
>> at
>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:130)
>>
>> at
>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>>
>> at
>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>>
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>
>> at
>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>>
>> at
>> org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:302)
>>
>> at
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>
>> at
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>>
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>
>> at
>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>>
>> at
>> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$35(RestClusterClient.java:901)
>>
>> at
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>
>> at
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>>
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>
>> at
>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>>
>> at
>> 

Re:Re: 在idea中用CliFrontend提交job 报错 java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer;

2024-04-30 文章 z_mmG
您好,按照您的指点,现在运行和编译都用的jdk11,已经没有sun.misc的报错了,但是提交job 还是报相同的错误







D:\software\jdk-11.0.7\bin\java.exe 
-agentlib:jdwp=transport=dt_socket,address=127.0.0.1:11039,suspend=y,server=n 
-Dlog.file=./log/flink-client.log 
-Dlog4j.configuration=./conf/log4j-cli.properties 
-Dlog4j.configurationFile=./conf/log4j-cli.properties 
-Dlogback.configurationFile=./conf/logback.xml 
-javaagent:C:\Users\10575\AppData\Local\JetBrains\IntelliJIdea2023.2\captureAgent\debugger-agent.jar=file:/C:/Users/10575/AppData/Local/Temp/capture.props
 -Dfile.encoding=UTF-8 -classpath 
"D:\flink\ayslib\log4j-slf4j-impl-2.17.1.jar;D:\flink\ayslib\log4j-core-2.17.1.jar;D:\flink\ayslib\log4j-api-2.17.1.jar;D:\flink\ayslib\log4j-1.2-api-2.17.1.jar;D:\flink\ayslib\flink-table-runtime-1.20-SNAPSHOT.jar;D:\flink\ayslib\flink-table-planner-loader-1.20-SNAPSHOT.jar;D:\flink\ayslib\flink-table-api-java-uber-1.20-SNAPSHOT.jar;D:\flink\ayslib\flink-scala_2.12-1.20-SNAPSHOT.jar;D:\flink\ayslib\flink-json-1.20-SNAPSHOT.jar;D:\flink\ayslib\flink-dist-1.20-SNAPSHOT.jar;D:\flink\ayslib\flink-csv-1.20-SNAPSHOT.jar;D:\flink\ayslib\flink-connector-files-1.20-SNAPSHOT.jar;D:\flink\ayslib\flink-cep-1.20-SNAPSHOT.jar;D:\flink\flink-clients\target\classes;D:\flink\flink-core\target\classes;D:\flink\flink-core-api\target\classes;D:\flink\flink-annotations\target\classes;D:\flink\flink-metrics\flink-metrics-core\target\classes;D:\software\apache-maven-3.8.6\repo\org\apache\flink\flink-shaded-asm-9\9.5-17.0\flink-shaded-asm-9-9.5-17.0.jar;D:\software\apache-maven-3.8.6\repo\org\apache\flink\flink-shaded-jackson\2.14.2-17.0\flink-shaded-jackson-2.14.2-17.0.jar;D:\software\apache-maven-3.8.6\repo\org\apache\commons\commons-lang3\3.12.0\commons-lang3-3.12.0.jar;D:\software\apache-maven-3.8.6\repo\org\snakeyaml\snakeyaml-engine\2.6\snakeyaml-engine-2.6.jar;D:\software\apache-maven-3.8.6\repo\org\apache\commons\commons-text\1.10.0\commons-text-1.10.0.jar;D:\software\apache-maven-3.8.6\repo\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;D:\software\apache-maven-3.8.6\repo\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;D:\software\apache-maven-3.8.6\repo\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;D:\software\apache-maven-3.8.6\repo\org\apache\commons\commons-compress\1.26.0\commons-compress-1.26.0.jar;D:\software\apache-maven-3.8.6\repo\org\apache\flink\flink-shaded-guava\31.1-jre-17.0\flink-shaded-guava-31.1-jre-17.0.jar;D:\flink\flink-runtime\target\classes;D:\flink\flink-rpc\flink-rpc-core\target\classes;D:\flink\flink-rpc\flink-rpc-akka-loader\target\classes;D:\flink\flink-queryable-state\flink-queryable-state-client-java\target\classes;D:\flink\flink-filesystems\flink-hadoop-fs\target\classes;D:\software\apache-maven-3.8.6\repo\commons-io\commons-io\2.15.1\commons-io-2.15.1.jar;D:\software\apache-maven-3.8.6\repo\org\apache\flink\flink-shaded-netty\4.1.91.Final-17.0\flink-shaded-netty-4.1.91.Final-17.0.jar;D:\software\apache-maven-3.8.6\repo\org\apache\flink\flink-shaded-zookeeper-3\3.7.1-17.0\flink-shaded-zookeeper-3-3.7.1-17.0.jar;D:\software\apache-maven-3.8.6\repo\org\javassist\javassist\3.24.0-GA\javassist-3.24.0-GA.jar;D:\software\apache-maven-3.8.6\repo\org\xerial\snappy\snappy-java\1.1.10.4\snappy-java-1.1.10.4.jar;D:\software\apache-maven-3.8.6\repo\tools\profiler\async-profiler\2.9\async-profiler-2.9.jar;D:\software\apache-maven-3.8.6\repo\org\lz4\lz4-java\1.8.0\lz4-java-1.8.0.jar;D:\software\apache-maven-3.8.6\repo\io\airlift\aircompressor\0.21\aircompressor-0.21.jar;D:\flink\flink-optimizer\target\classes;D:\flink\flink-java\target\classes;D:\software\apache-maven-3.8.6\repo\org\apache\commons\commons-math3\3.6.1\commons-math3-3.6.1.jar;D:\software\apache-maven-3.8.6\repo\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;D:\software\apache-maven-3.8.6\repo\commons-cli\commons-cli\1.5.0\commons-cli-1.5.0.jar;D:\flink\flink-streaming-java\target\classes;D:\flink\flink-connectors\flink-file-sink-common\target\classes;D:\flink\flink-connectors\flink-connector-datagen\target\classes;D:\flink\flink-datastream\target\classes;D:\flink\flink-datastream-api\target\classes;D:\software\apache-maven-3.8.6\repo\org\apache\flink\flink-shaded-force-shading\17.0\flink-shaded-force-shading-17.0.jar;D:\software\apache-maven-3.8.6\repo\org\slf4j\slf4j-api\1.7.36\slf4j-api-1.7.36.jar;D:\software\apache-maven-3.8.6\repo\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;D:\software\apache-maven-3.8.6\repo\org\objenesis\objenesis\2.1\objenesis-2.1.jar;D:\software\JetBrains\IntelliJ
 IDEA 2023.2.5\lib\idea_rt.jar" org.apache.flink.client.cli.CliFrontend run -c 
org.apache.flink.streaming.examples.ys.WordCount ./WordCount.jar

已连接到地址为 ''127.0.0.1:11039',传输: '套接字'' 的目标虚拟机

WARNING: An illegal reflective access operation has occurred

WARNING: Illegal reflective access by 
org.apache.flink.streaming.runtime.translators.DataStreamV2SinkTransformationTranslator
 (file:/D:/flink/ayslib/flink-dist-1.20-SNAPSHOT.jar) to field 

Re: 在idea中用CliFrontend提交job 报错 java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer;

2024-04-30 文章 Biao Geng
Hi,

这个报错一般是JDK版本不一致导致的。建议统一build flink和执行flink作业时的Java版本,(都用JDK8 或者 都用JDK11)。
用JDK11时没有sun.misc的问题可以试试勾选掉Idea的Settings-> Build, Execution and Deployment
-> Compiler-> Java Compiler的Use '--release' option for cross-compilation'
选项。

Best,
Biao Geng


z_mmG <13520871...@163.com> 于2024年4月30日周二 15:08写道:

>
> JDK11 编译的flink1.19的源码
> 因为他说没有sun.misc,所以启动用的jdk8
>
> 已连接到地址为 ''127.0.0.1:8339',传输: '套接字'' 的目标虚拟机
>
> Job has been submitted with JobID 0975ec264edfd11d236dd190e7708d70
>
>
> 
>
>  The program finished with the following exception:
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 0975ec264edfd11d236dd190e7708d70)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:373)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
>
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113)
>
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
>
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>
> at
> org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 0975ec264edfd11d236dd190e7708d70)
>
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>
> at
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:170)
>
> at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:121)
>
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2325)
>
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2303)
>
> at org.apache.flink.streaming.examples.ys.WordCount.main(WordCount.java:34)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356)
>
> ... 9 more
>
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
> failed (JobID: 0975ec264edfd11d236dd190e7708d70)
>
> at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:130)
>
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>
> at
> org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:302)
>
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$35(RestClusterClient.java:901)
>
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>
> at
> org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:302)
>
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>
> at
> 

Re: Flink sql retract to append

2024-04-30 文章 焦童
谢谢你的建议  但是top-1也会产生回撤信息  

> 2024年4月30日 15:27,ha.fen...@aisino.com 写道:
> 
> 可以参考这个
> https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/table/sql/queries/deduplication/
> 1.11版本不知道是不是支持
> 
> From: 焦童
> Date: 2024-04-30 11:25
> To: user-zh
> Subject: Flink sql retract to append
> Hello ,
> 我使用Flink 1.11 版本 sql  进行数据去重(通过 group by 
> 形式)但是这会产生回撤流,下游存储不支持回撤流信息仅支持append,在DataStream 
> 中我可以通过状态进行去重,但是在sql中如何做到去重且不产生回撤流呢。谢谢各位



在idea中用CliFrontend提交job 报错 java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer;

2024-04-30 文章 z_mmG


JDK11 编译的flink1.19的源码
因为他说没有sun.misc,所以启动用的jdk8  


已连接到地址为 ''127.0.0.1:8339',传输: '套接字'' 的目标虚拟机

Job has been submitted with JobID 0975ec264edfd11d236dd190e7708d70






 The program finished with the following exception:




org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: org.apache.flink.client.program.ProgramInvocationException: 
Job failed (JobID: 0975ec264edfd11d236dd190e7708d70)

at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:373)

at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)

at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)

at 
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)

at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)

at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)

Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
0975ec264edfd11d236dd190e7708d70)

at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)

at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)

at 
org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:170)

at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:121)

at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2325)

at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2303)

at org.apache.flink.streaming.examples.ys.WordCount.main(WordCount.java:34)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356)

... 9 more

Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: 0975ec264edfd11d236dd190e7708d70)

at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:130)

at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)

at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)

at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)

at 
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:302)

at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)

at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)

at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)

at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$35(RestClusterClient.java:901)

at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)

at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)

at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)

at 
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:302)

at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)

at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)

at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)

at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)

at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)


Flink sql retract to append

2024-04-29 文章 焦童
Hello ,
 我使用Flink 1.11 版本 sql  进行数据去重(通过 group by 
形式)但是这会产生回撤流,下游存储不支持回撤流信息仅支持append,在DataStream 
中我可以通过状态进行去重,但是在sql中如何做到去重且不产生回撤流呢。谢谢各位

Re:Flink 截止到1.18,是否有办法在Table API上添加uid?

2024-04-24 文章 Xuyang
Hi, 如果在中间添加了op,或者修改了处理逻辑,那么代表拓扑图会变,那么基于拓扑序所确定的uid也会变,从状态恢复就可能失败。具体可以参考[1]


目前table api应该是没有开放自定义uid的能力,可以在jira[2]上新建一个feature的jira,然后在dev邮件里发起讨论下。




[1] 
https://github.com/apache/flink/blob/92eef24d4cc531d6474252ef909fc6d431285dd9/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java#L243C38-L243C62
[2] https://issues.apache.org/jira/projects/FLINK/issues/



--

Best!
Xuyang





在 2024-04-25 01:18:55,"Guanlin Zhang"  写道:
>Hi Team,
>
>我们这边的业务使用 Flink MySQL CDC到 OpenSearch并且使用TABLE API: INSERT INTO t1 SELECT * 
>FROM t2 这种方式。
>
>由于我们这边可能会在运行过程中添加额外的Operator,我们有办法在使用snapshot 恢复后保留之前src和sink 
>operator的状态么?我看到在DataStream API可以通过设定uid。Table API有同样的方法吗?我看到Flink 
>jira:https://issues.apache.org/jira/browse/FLINK-28861 
>可以设置table.exec.uid.generation=PLAN_ONLY。请问默认配置下,中间添加transformation 
>operator或者其他变更后从snapshot恢复会保留之前的状态么?
>
>


  1   2   3   4   5   6   7   8   9   10   >