Re: [ANNOUNCE] Donation Flink CDC into Apache Flink has Completed

2024-03-20 文章 Jark Wu
Congratulations and welcome!

Best,
Jark

On Thu, 21 Mar 2024 at 10:35, Rui Fan <1996fan...@gmail.com> wrote:

> Congratulations!
>
> Best,
> Rui
>
> On Thu, Mar 21, 2024 at 10:25 AM Hang Ruan  wrote:
>
> > Congrattulations!
> >
> > Best,
> > Hang
> >
> > Lincoln Lee  于2024年3月21日周四 09:54写道:
> >
> >>
> >> Congrats, thanks for the great work!
> >>
> >>
> >> Best,
> >> Lincoln Lee
> >>
> >>
> >> Peter Huang  于2024年3月20日周三 22:48写道:
> >>
> >>> Congratulations
> >>>
> >>>
> >>> Best Regards
> >>> Peter Huang
> >>>
> >>> On Wed, Mar 20, 2024 at 6:56 AM Huajie Wang 
> wrote:
> >>>
> 
>  Congratulations
> 
> 
> 
>  Best,
>  Huajie Wang
> 
> 
> 
>  Leonard Xu  于2024年3月20日周三 21:36写道:
> 
> > Hi devs and users,
> >
> > We are thrilled to announce that the donation of Flink CDC as a
> > sub-project of Apache Flink has completed. We invite you to explore
> the new
> > resources available:
> >
> > - GitHub Repository: https://github.com/apache/flink-cdc
> > - Flink CDC Documentation:
> > https://nightlies.apache.org/flink/flink-cdc-docs-stable
> >
> > After Flink community accepted this donation[1], we have completed
> > software copyright signing, code repo migration, code cleanup,
> website
> > migration, CI migration and github issues migration etc.
> > Here I am particularly grateful to Hang Ruan, Zhongqaing Gong,
> > Qingsheng Ren, Jiabao Sun, LvYanquan, loserwang1024 and other
> contributors
> > for their contributions and help during this process!
> >
> >
> > For all previous contributors: The contribution process has slightly
> > changed to align with the main Flink project. To report bugs or
> suggest new
> > features, please open tickets
> > Apache Jira (https://issues.apache.org/jira).  Note that we will no
> > longer accept GitHub issues for these purposes.
> >
> >
> > Welcome to explore the new repository and documentation. Your
> feedback
> > and contributions are invaluable as we continue to improve Flink CDC.
> >
> > Thanks everyone for your support and happy exploring Flink CDC!
> >
> > Best,
> > Leonard
> > [1] https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob
> >
> >
>


Re: [ANNOUNCE] Apache Flink 1.19.0 released

2024-03-18 文章 Jark Wu
Congrats! Thanks Lincoln, Jing, Yun and Martijn driving this release.
Thanks all who involved this release!

Best,
Jark


On Mon, 18 Mar 2024 at 16:31, Rui Fan <1996fan...@gmail.com> wrote:

> Congratulations, thanks for the great work!
>
> Best,
> Rui
>
> On Mon, Mar 18, 2024 at 4:26 PM Lincoln Lee 
> wrote:
>
> > The Apache Flink community is very happy to announce the release of
> Apache
> > Flink 1.19.0, which is the fisrt release for the Apache Flink 1.19
> series.
> >
> > Apache Flink® is an open-source stream processing framework for
> > distributed, high-performing, always-available, and accurate data
> streaming
> > applications.
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html
> >
> > Please check out the release blog post for an overview of the
> improvements
> > for this bugfix release:
> >
> >
> https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/
> >
> > The full release notes are available in Jira:
> >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353282
> >
> > We would like to thank all contributors of the Apache Flink community who
> > made this release possible!
> >
> >
> > Best,
> > Yun, Jing, Martijn and Lincoln
> >
>


Re: [ANNOUNCE] Apache Flink 1.18.0 released

2023-10-26 文章 Jark Wu
Congratulations and thanks release managers and everyone who has
contributed!

Best,
Jark

On Fri, 27 Oct 2023 at 12:25, Hang Ruan  wrote:

> Congratulations!
>
> Best,
> Hang
>
> Samrat Deb  于2023年10月27日周五 11:50写道:
>
> > Congratulations on the great release
> >
> > Bests,
> > Samrat
> >
> > On Fri, 27 Oct 2023 at 7:59 AM, Yangze Guo  wrote:
> >
> > > Great work! Congratulations to everyone involved!
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Fri, Oct 27, 2023 at 10:23 AM Qingsheng Ren 
> wrote:
> > > >
> > > > Congratulations and big THANK YOU to everyone helping with this
> > release!
> > > >
> > > > Best,
> > > > Qingsheng
> > > >
> > > > On Fri, Oct 27, 2023 at 10:18 AM Benchao Li 
> > > wrote:
> > > >>
> > > >> Great work, thanks everyone involved!
> > > >>
> > > >> Rui Fan <1996fan...@gmail.com> 于2023年10月27日周五 10:16写道:
> > > >> >
> > > >> > Thanks for the great work!
> > > >> >
> > > >> > Best,
> > > >> > Rui
> > > >> >
> > > >> > On Fri, Oct 27, 2023 at 10:03 AM Paul Lam 
> > > wrote:
> > > >> >
> > > >> > > Finally! Thanks to all!
> > > >> > >
> > > >> > > Best,
> > > >> > > Paul Lam
> > > >> > >
> > > >> > > > 2023年10月27日 03:58,Alexander Fedulov <
> > alexander.fedu...@gmail.com>
> > > 写道:
> > > >> > > >
> > > >> > > > Great work, thanks everyone!
> > > >> > > >
> > > >> > > > Best,
> > > >> > > > Alexander
> > > >> > > >
> > > >> > > > On Thu, 26 Oct 2023 at 21:15, Martijn Visser <
> > > martijnvis...@apache.org>
> > > >> > > > wrote:
> > > >> > > >
> > > >> > > >> Thank you all who have contributed!
> > > >> > > >>
> > > >> > > >> Op do 26 okt 2023 om 18:41 schreef Feng Jin <
> > > jinfeng1...@gmail.com>
> > > >> > > >>
> > > >> > > >>> Thanks for the great work! Congratulations
> > > >> > > >>>
> > > >> > > >>>
> > > >> > > >>> Best,
> > > >> > > >>> Feng Jin
> > > >> > > >>>
> > > >> > > >>> On Fri, Oct 27, 2023 at 12:36 AM Leonard Xu <
> > xbjt...@gmail.com>
> > > wrote:
> > > >> > > >>>
> > > >> > >  Congratulations, Well done!
> > > >> > > 
> > > >> > >  Best,
> > > >> > >  Leonard
> > > >> > > 
> > > >> > >  On Fri, Oct 27, 2023 at 12:23 AM Lincoln Lee <
> > > lincoln.8...@gmail.com>
> > > >> > >  wrote:
> > > >> > > 
> > > >> > > > Thanks for the great work! Congrats all!
> > > >> > > >
> > > >> > > > Best,
> > > >> > > > Lincoln Lee
> > > >> > > >
> > > >> > > >
> > > >> > > > Jing Ge  于2023年10月27日周五
> > 00:16写道:
> > > >> > > >
> > > >> > > >> The Apache Flink community is very happy to announce the
> > > release of
> > > >> > > > Apache
> > > >> > > >> Flink 1.18.0, which is the first release for the Apache
> > > Flink 1.18
> > > >> > > > series.
> > > >> > > >>
> > > >> > > >> Apache Flink® is an open-source unified stream and batch
> > data
> > > >> > >  processing
> > > >> > > >> framework for distributed, high-performing,
> > > always-available, and
> > > >> > > > accurate
> > > >> > > >> data applications.
> > > >> > > >>
> > > >> > > >> The release is available for download at:
> > > >> > > >> https://flink.apache.org/downloads.html
> > > >> > > >>
> > > >> > > >> Please check out the release blog post for an overview of
> > the
> > > >> > > > improvements
> > > >> > > >> for this release:
> > > >> > > >>
> > > >> > > >>
> > > >> > > >
> > > >> > > 
> > > >> > > >>>
> > > >> > > >>
> > > >> > >
> > >
> >
> https://flink.apache.org/2023/10/24/announcing-the-release-of-apache-flink-1.18/
> > > >> > > >>
> > > >> > > >> The full release notes are available in Jira:
> > > >> > > >>
> > > >> > > >>
> > > >> > > >
> > > >> > > 
> > > >> > > >>>
> > > >> > > >>
> > > >> > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352885
> > > >> > > >>
> > > >> > > >> We would like to thank all contributors of the Apache
> Flink
> > > >> > > >> community
> > > >> > >  who
> > > >> > > >> made this release possible!
> > > >> > > >>
> > > >> > > >> Best regards,
> > > >> > > >> Konstantin, Qingsheng, Sergey, and Jing
> > > >> > > >>
> > > >> > > >
> > > >> > > 
> > > >> > > >>>
> > > >> > > >>
> > > >> > >
> > > >> > >
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >>
> > > >> Best,
> > > >> Benchao Li
> > >
> >
>


Re: 【flink native k8s】HA配置 taskmanager pod一直重启

2022-08-31 文章 Wu,Zhiheng
找不到TM的日志。因为TM还没有启动起来,pod就挂了
我看下是否是这个原因,目前确实没有增加-Dkubernetes.taskmanager.service-account这个参数
-Dkubernetes.taskmanager.service-account这个参数是在./bin/kubernetes-session.sh启动session集群的时候加的吗

在 2022/8/31 下午4:10,“Yang Wang” 写入:

我猜测你是因为没有给TM设置service account,导致TM没有权限从K8s ConfigMap拿到leader,从而注册到RM、JM

-Dkubernetes.taskmanager.service-account=wuzhiheng \


Best,
Yang

Xuyang  于2022年8月30日周二 23:22写道:

> Hi, 能贴一下TM的日志吗,看Warn的日志貌似是TM一直起不来
> 在 2022-08-30 03:45:43,"Wu,Zhiheng"  写道:
> >【问题描述】
> >启用HA配置之后,taskmanager pod一直处于创建-停止-创建的过程,无法启动任务
> >
> >1. 任务配置和启动过程
> >
> >a)  修改conf/flink.yaml配置文件,增加HA配置
> >kubernetes.cluster-id: realtime-monitor
> >high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> >high-availability.storageDir:
> file:///opt/flink/checkpoint/recovery/monitor//
> 这是一个NFS路径,以pvc挂载到pod
> >
> >b)  先通过以下命令创建一个无状态部署,建立一个session集群
> >
> >./bin/kubernetes-session.sh \
> >
> 
>-Dkubernetes.secrets=cdn-res-bd-keystore:/opt/flink/kafka/res/keystore/bd,cdn-res-bd-truststore:/opt/flink/kafka/res/truststore/bd,cdn-res-bj-keystore://opt/flink/kafka/res/keystore/bj,cdn-res-bj-truststore:/opt/flink/kafka/res/truststore/bj
> \
> >
> >-Dkubernetes.pod-template-file=./conf/pod-template.yaml \
> >
> >-Dkubernetes.cluster-id=realtime-monitor \
> >
> >-Dkubernetes.jobmanager.service-account=wuzhiheng \
> >
> >-Dkubernetes.namespace=monitor \
> >
> >-Dtaskmanager.numberOfTaskSlots=6 \
> >
> >-Dtaskmanager.memory.process.size=8192m \
> >
> >-Djobmanager.memory.process.size=2048m
> >
> >c)  最后通过web ui提交一个jar包任务,jobmanager 出现如下日志
> >
> >2022-08-29 23:49:04,150 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod
> realtime-monitor-taskmanager-1-13 is created.
> >
> >2022-08-29 23:49:04,152 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod
> realtime-monitor-taskmanager-1-12 is created.
> >
> >2022-08-29 23:49:04,161 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received
> new TaskManager pod: realtime-monitor-taskmanager-1-12
> >
> >2022-08-29 23:49:04,162 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requested worker realtime-monitor-taskmanager-1-12 with resource spec
> WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes),
> taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes),
> managedMemSize=0 bytes, numSlots=6}.
> >
> >2022-08-29 23:49:04,162 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received
> new TaskManager pod: realtime-monitor-taskmanager-1-13
> >
> >2022-08-29 23:49:04,162 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requested worker realtime-monitor-taskmanager-1-13 with resource spec
> WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes),
> taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes),
> managedMemSize=0 bytes, numSlots=6}.
> >
> >2022-08-29 23:49:07,176 WARN
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Reaching max start worker failure rate: 12 events detected in the recent
> interval, reaching the threshold 10.00.
> >
> >2022-08-29 23:49:07,176 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Will not retry creating worker in 3000 ms.
> >
> >2022-08-29 23:49:07,176 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker realtime-monitor-taskmanager-1-12 with resource spec
> WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes),
> taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes),
> managedMemSize=0 bytes, numSlots=6} was requested in current attempt and
> has not registered. Current pending count after removing: 1.
> >
> >2022-08-29 23:49:07,176 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker realtime-monitor-taskmanager-1-12 is terminated. Diagnostics: Pod
> terminated, container termination statuses:
> [flink-main-container(exitCode=1, reason=Error, message=null)], pod 
status:
> Failed(reason=

【flink native k8s】HA配置 taskmanager pod一直重启

2022-08-29 文章 Wu,Zhiheng
【问题描述】
启用HA配置之后,taskmanager pod一直处于创建-停止-创建的过程,无法启动任务

1. 任务配置和启动过程

a)  修改conf/flink.yaml配置文件,增加HA配置
kubernetes.cluster-id: realtime-monitor
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///opt/flink/checkpoint/recovery/monitor 
   // 这是一个NFS路径,以pvc挂载到pod

b)  先通过以下命令创建一个无状态部署,建立一个session集群

./bin/kubernetes-session.sh \

-Dkubernetes.secrets=cdn-res-bd-keystore:/opt/flink/kafka/res/keystore/bd,cdn-res-bd-truststore:/opt/flink/kafka/res/truststore/bd,cdn-res-bj-keystore://opt/flink/kafka/res/keystore/bj,cdn-res-bj-truststore:/opt/flink/kafka/res/truststore/bj
 \

-Dkubernetes.pod-template-file=./conf/pod-template.yaml \

-Dkubernetes.cluster-id=realtime-monitor \

-Dkubernetes.jobmanager.service-account=wuzhiheng \

-Dkubernetes.namespace=monitor \

-Dtaskmanager.numberOfTaskSlots=6 \

-Dtaskmanager.memory.process.size=8192m \

-Djobmanager.memory.process.size=2048m

c)  最后通过web ui提交一个jar包任务,jobmanager 出现如下日志

2022-08-29 23:49:04,150 INFO  
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod 
realtime-monitor-taskmanager-1-13 is created.

2022-08-29 23:49:04,152 INFO  
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod 
realtime-monitor-taskmanager-1-12 is created.

2022-08-29 23:49:04,161 INFO  
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received new 
TaskManager pod: realtime-monitor-taskmanager-1-12

2022-08-29 23:49:04,162 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Requested worker realtime-monitor-taskmanager-1-12 with resource spec 
WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), 
taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes), 
managedMemSize=0 bytes, numSlots=6}.

2022-08-29 23:49:04,162 INFO  
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received new 
TaskManager pod: realtime-monitor-taskmanager-1-13

2022-08-29 23:49:04,162 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Requested worker realtime-monitor-taskmanager-1-13 with resource spec 
WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), 
taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes), 
managedMemSize=0 bytes, numSlots=6}.

2022-08-29 23:49:07,176 WARN  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Reaching max start worker failure rate: 12 events detected in the recent 
interval, reaching the threshold 10.00.

2022-08-29 23:49:07,176 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Will 
not retry creating worker in 3000 ms.

2022-08-29 23:49:07,176 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Worker realtime-monitor-taskmanager-1-12 with resource spec WorkerResourceSpec 
{cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 
bytes, networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes, 
numSlots=6} was requested in current attempt and has not registered. Current 
pending count after removing: 1.

2022-08-29 23:49:07,176 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Worker realtime-monitor-taskmanager-1-12 is terminated. Diagnostics: Pod 
terminated, container termination statuses: [flink-main-container(exitCode=1, 
reason=Error, message=null)], pod status: Failed(reason=null, message=null)

2022-08-29 23:49:07,176 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Requesting new worker with resource spec WorkerResourceSpec {cpuCores=6.0, 
taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 bytes, 
networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes, 
numSlots=6}, current pending count: 2.

2022-08-29 23:49:07,514 WARN  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Reaching max start worker failure rate: 13 events detected in the recent 
interval, reaching the threshold 10.00.

2022-08-29 23:49:07,514 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Worker realtime-monitor-taskmanager-1-13 with resource spec WorkerResourceSpec 
{cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 
bytes, networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes, 
numSlots=6} was requested in current attempt and has not registered. Current 
pending count after removing: 1.

2022-08-29 23:49:07,514 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Worker realtime-monitor-taskmanager-1-13 is terminated. Diagnostics: Pod 
terminated, container termination statuses: [flink-main-container(exitCode=1, 
reason=Error, message=null)], pod status: Failed(reason=null, message=null)

2022-08-29 23:49:07,515 INFO  

Re: [ANNOUNCE] Welcome to join the Apache Flink community on Slack

2022-06-02 文章 Jark Wu
Thank Xingtong for making this possible!

Cheers,
Jark Wu

On Thu, 2 Jun 2022 at 15:31, Xintong Song  wrote:

> Hi everyone,
>
> I'm very happy to announce that the Apache Flink community has created a
> dedicated Slack workspace [1]. Welcome to join us on Slack.
>
> ## Join the Slack workspace
>
> You can join the Slack workspace by either of the following two ways:
> 1. Click the invitation link posted on the project website [2].
> 2. Ask anyone who already joined the Slack workspace to invite you.
>
> We recommend 2), if available. Due to Slack limitations, the invitation
> link in 1) expires and needs manual updates after every 100 invites. If it
> is expired, please reach out to the dev / user mailing lists.
>
> ## Community rules
>
> When using the community Slack workspace, please follow these community
> rules:
> * *Be respectful* - This is the most important rule!
> * All important decisions and conclusions *must be reflected back to the
> mailing lists*. "If it didn’t happen on a mailing list, it didn’t happen."
> - The Apache Mottos [3]
> * Use *Slack threads* to keep parallel conversations from overwhelming a
> channel.
> * Please *do not direct message* people for troubleshooting, Jira assigning
> and PR review. These should be picked-up voluntarily.
>
>
> ## Maintenance
>
>
> Committers can refer to this wiki page [4] for information needed for
> maintaining the Slack workspace.
>
>
> Thanks Jark, Martijn and Robert for helping setting up the Slack workspace.
>
>
> Best,
>
> Xintong
>
>
> [1] https://apache-flink.slack.com/
>
> [2] https://flink.apache.org/community.html#slack
>
> [3] http://theapacheway.com/on-list/
>
> [4] https://cwiki.apache.org/confluence/display/FLINK/Slack+Management
>


GlobalCommitter in Flink's two-phase commit

2022-05-24 文章 di wu
Hello
Regarding the GlobalCommitter in Flink's two-phase commit,
I see it was introduced in FLIP-143, but it seems to have been removed again in 
FLP-191 and marked as Deprecated in the source code.
I haven't found any relevant information about the use of GlobalCommitter.


There are two questions I would like to ask:
1. What are the general usage scenarios of GlobalCommitter?
2. Why should GlobalCommitter be removed in the new version of the api?


Thanks  Regards,


di.wu

Re: flinkCDC2.1.1

2022-01-06 文章 Jark Wu
Flink CDC 的问题可以 report 到
https://github.com/ververica/flink-cdc-connectors/issues到

On Thu, 30 Dec 2021 at 14:08, Liu Join  wrote:

> 使用flinkCDC2.1.1读取MySQL数据,一段时间后报错
> 图床链接:报错图片 
>
>
>
> 从 Windows 版邮件 发送
>
>
>


Re: 邮件归档访问不了

2022-01-06 文章 Jark Wu
nabble 服务挂了,用这个地址吧:https://lists.apache.org/list.html?d...@flink.apache.org

On Fri, 31 Dec 2021 at 18:29, Ada Wong  wrote:

> 想看当时的讨论情况,但是这个访问不了。
>
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Move-Flink-ML-pipeline-API-and-library-code-to-a-separate-repository-named-flink-ml-tc49420.html
>


Re: flink mysql cdc同步字段不识别

2022-01-06 文章 Jark Wu
这个报错日志应该没有关系,是 rest client 的报错,不是正常数据处理流程的报错。

mysql-cdc 没有 jackson json 解析相关的代码。

On Wed, 5 Jan 2022 at 17:09, Fei Han 
wrote:

>
> @all:
> Flink mysql cdc同步数据报字段不识别,是什么原因造成的?难道是关键字不识别?报错日志如下:
>
>  httpResponseStatus=200 OK}
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
> Unrecognized field "status" (class
> org.apache.flink.runtime.rest.messages.ErrorResponseBody), not marked as
> ignorable (one known property: "errors"])
>  at [Source: UNKNOWN; line: -1, column: -1] (through reference chain:
> org.apache.flink.runtime.rest.messages.ErrorResponseBody["status"])
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:987)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1974)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1686)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1635)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:541)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1390)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:362)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:195)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:4569)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2798)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:3261)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:483)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:466)
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
> at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
> [?:1.8.0_211]
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> [?:1.8.0_211]
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> [?:1.8.0_211]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_211]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_211]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_211]
>


Re: [ANNOUNCE] Flink mailing lists archive service has migrated to Apache Archive service

2021-09-06 文章 Jark Wu
Thanks Leonard,

I have seen many users complaining that the Flink mailing list doesn't
work (they were using Nabble).
I think this information would be very helpful.

Best,
Jark

On Mon, 6 Sept 2021 at 16:39, Leonard Xu  wrote:

> Hi, all
>
> The mailing list archive service Nabble Archive was broken at the end of
> June, the Flink community has migrated the mailing lists archives[1] to
> Apache Archive service by commit[2], you can refer [3] to know more mailing
> lists archives of Flink.
>
> Apache Archive service is maintained by ASF thus the stability is
> guaranteed, it’s a web-based mail archive service which allows you to
> browse, search, interact, subscribe, unsubscribe, etc. with mailing lists.
>
> Apache Archive service shows mails of the last month by default, you can
> specify the date range to browse, search the history mails.
>
>
> Hope it would be helpful.
>
> Best,
> Leonard
>
> [1] The Flink mailing lists in Apache archive service
> dev mailing list archives:
> https://lists.apache.org/list.html?d...@flink.apache.org
> user mailing list archives :
> https://lists.apache.org/list.html?u...@flink.apache.org
> user-zh mailing list archives :
> https://lists.apache.org/list.html?user-zh@flink.apache.org
> [2]
> https://github.com/apache/flink-web/commit/9194dda862da00d93f627fd315056471657655d1
> [3] https://flink.apache.org/community.html#mailing-lists
>


Re: 中文教程更新不及时问题

2021-06-23 文章 Jark Wu
Hi Kevin,

欢迎来到 Apache Flink 开源社区!正如唐云所说,社区非常欢迎每一个贡献,也很珍惜每一份贡献。
但是中文文档的维护是一个非常庞大的工作,涉及到所有的模块,所以需要很多模块的 committer 的协作,
所以有时候难免会有更新不及时。

如果你有发现未翻译的页面且没有相关 JIRA issue,可以直接去创建 issue 并提交 PR。
如果已有相关 issue 和 PR,可以帮助 review,社区目前更缺高质量的 reviewer,这更能加速很多翻译的进度。

Best,
Jark

On Wed, 23 Jun 2021 at 11:04, Yun Tang  wrote:

> Hi Kevin,
>
> 欢迎来到Apache Flink开源社区!
>
> 因为开源社区的工作,一些参与者很多时候都是工作时间之外参与的,可能难免遇到进度更新不及时,或者长时间不再活跃的问题。
>
> 非常欢迎您在相关JIRA
> ticket下面评论和申请权限创建PR,社区一直都欢迎每一位贡献者,对于文档的维护尤其是中文文档的翻译也是非常需要的,如果有任何想要贡献的部分,欢迎直接去JIRA
> ticket下面、github PR下面评论,或者直接创建相关ticket。
>
> 祝好
> 唐云
> --
> *From:* pang fan 
> *Sent:* Monday, June 21, 2021 21:35
> *To:* user-zh@flink.apache.org 
> *Subject:* 中文教程更新不及时问题
>
> 大家好,
>
> 我是Flink的初学者,在跟着
>
> https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/try-flink/table_api/
>
> 官方教程(中文版)学习时发现很多中文教程都没有翻译掉,但是去看PR记录又发现很多都已经提了PR但是一直没有合并到主分支,里面很多PR都是几个月前的提的,后来好久都没有更新。
>
> 请问现在还有人在跟这些问题吗?如果有,可以更新下JIRA上的工单状态和代码PR状态,这样有需要我们也能申领工单给社区做一些贡献。
>
>
> 谢谢!
> Kevin Fan
>


Re: hbase async lookup能否保证输出结果有序?

2021-06-17 文章 Jark Wu
可以看下 AsyncWaitOperator 的源码实现。

Best,
Jark

On Tue, 15 Jun 2021 at 18:53, zilong xiao  wrote:

> 想了解下这块如何保证100%有序的呢,感觉异步查询是不是无法保证查询结果的先后,比如网络原因等等。
>
> Jingsong Li  于2021年6月15日周二 下午5:07写道:
>
> > 是有序的。
> >
> > 无序的mode目前并没有支持, 目前可能会影响流计算的正确性
> >
> > Best,
> > Jingsong
> >
> > On Tue, Jun 15, 2021 at 3:42 PM zilong xiao  wrote:
> >
> > > hi,社区大佬们好,想问下flink 1.13中hbase async lookup能否保证输出结果有序?
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>


Re: 邮件退订

2021-06-17 文章 Jark Wu
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jark

On Thu, 17 Jun 2021 at 09:29, wangweigu...@stevegame.cn <
wangweigu...@stevegame.cn> wrote:

>
> 邮箱变更,退订!
>
>
>
>


Re: 退订

2021-06-17 文章 Jark Wu
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jark

On Tue, 15 Jun 2021 at 23:56, frank.liu  wrote:

> 退订
>
>
> | |
> frank.liu
> |
> |
> frank...@163.com
> |
> 签名由网易邮箱大师定制


Re: Re: flink sql cdc做计算后, elastic search connector 多并发sink 会有问题,偶现!数据量较大,目前还没较好复现。请问有人是否也遇到过类似的问题。

2021-06-17 文章 Jark Wu
社区最近重新设计了 mysql-cdc 的实现,可以支持全量阶段并发读取、checkpoint,移除全局锁依赖。
可以关注 GitHub 仓库的动态 https://github.com/ververica/flink-cdc-connectors。
7月的 meetup 上也会分享相关设计和实现,敬请期待。

Best,
Jark

On Thu, 17 Jun 2021 at 09:34, casel.chen  wrote:

> Flink CDC什么时候能够支持修改并行度,进行细粒度的资源控制?目前我也遇到flink sql
> cdc写mysql遇到数据同步跟不上数据写入速度问题,何时能支持像mysql并行复制这种机制呢?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-06-16 17:27:14,"Leonard Xu"  写道:
> >看起来和 Flink-CDC 关系不大,看异常栈是 ES 侧抛出的异常 version_conflict_engine_exception,
> 可以查下这个异常,看下是不是有写(其他作业/业务 也在写同步表)冲突。
> >
> >祝好,
> >Leonard
> >
> >> 在 2021年6月16日,17:05,mokaful <649713...@qq.com> 写道:
> >>
> >> 相同问题,请问有处理方式吗
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink sql count distonct 优化

2021-03-26 文章 Jark Wu
> 如果不是window agg,开启参数后flink会自动打散是吧
是的

> 那关于window agg, 不能自动打散,这部分的介绍,在文档中可以找到吗?
文档中没有说明。 这个文档[1] 里说地都是针对 unbounded agg 的优化。

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation

On Fri, 26 Mar 2021 at 11:00, guomuhua <663021...@qq.com> wrote:

> Jark wrote
> > 我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window
> > agg支持这个参数了。可以期待下。
> >
> > Best,
> > Jark
> >
> > On Wed, 24 Mar 2021 at 19:29, Robin Zhang 
>
> > vincent2015qdlg@
>
> > 
> > wrote:
> >
> >> Hi,guomuhua
> >>   开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。
> >>
> >> Best,
> >> Robin
> >>
> >>
> >> guomuhua wrote
> >> > 在SQL中,如果开启了 local-global 参数:set
> >> > table.optimizer.agg-phase-strategy=TWO_PHASE;
> >> > 或者开启了Partial-Final 参数:set
> >> table.optimizer.distinct-agg.split.enabled=true;
> >> >  set
> >> > table.optimizer.distinct-agg.split.bucket-num=1024;
> >> > 还需要对应的将SQL改写为两段式吗?
> >> > 例如:
> >> > 原SQL:
> >> > SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
> >> >
> >> > 对所需DISTINCT字段buy_id模1024自动打散后,SQL:
> >> > SELECT day, SUM(cnt) total
> >> > FROM (
> >> > SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
> >> > FROM T GROUP BY day, MOD(buy_id, 1024))
> >> > GROUP BY day
> >> >
> >> > 还是flink会帮我自动改写SQL,我不用关心?
> >> >
> >> > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
> >> > 
> >>
> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png
> ;
> >>
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > --
> >> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
> >>
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
>
> 感谢,如果不是window agg,开启参数后flink会自动打散是吧。那关于window agg,
> 不能自动打散,这部分的介绍,在文档中可以找到吗?具体在哪里呢?还是需要从源码里找呢?望指教。再次感谢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink sql count distonct 优化

2021-03-25 文章 Jark Wu
我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window
agg支持这个参数了。可以期待下。

Best,
Jark

On Wed, 24 Mar 2021 at 19:29, Robin Zhang 
wrote:

> Hi,guomuhua
>   开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。
>
> Best,
> Robin
>
>
> guomuhua wrote
> > 在SQL中,如果开启了 local-global 参数:set
> > table.optimizer.agg-phase-strategy=TWO_PHASE;
> > 或者开启了Partial-Final 参数:set
> table.optimizer.distinct-agg.split.enabled=true;
> >  set
> > table.optimizer.distinct-agg.split.bucket-num=1024;
> > 还需要对应的将SQL改写为两段式吗?
> > 例如:
> > 原SQL:
> > SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
> >
> > 对所需DISTINCT字段buy_id模1024自动打散后,SQL:
> > SELECT day, SUM(cnt) total
> > FROM (
> > SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
> > FROM T GROUP BY day, MOD(buy_id, 1024))
> > GROUP BY day
> >
> > 还是flink会帮我自动改写SQL,我不用关心?
> >
> > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
> > 
> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png;
>
> >
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: sql 动态修改参数问题

2021-03-04 文章 Jark Wu
看起来是个分段优化复用节点的bug,可以去 JIRA 开个 issue。

Best,
Jark

On Thu, 4 Mar 2021 at 19:37, 酷酷的浑蛋  wrote:

> StatementSet statementSet = tableEnvironment.createStatementSet();
> String sql1 = "insert into test select a,b,c from test_a_12342 /*+
> OPTIONS('table-name'='test_a_1')*/";
> String sql2 = "insert into test select a,b,c from test_a_12342 /*+
> OPTIONS('table-name'='test_a_2')*/";
> statementSet.addInsertSql(sql1);
> statementSet.addInsertSql(sql2);
> statementSet.execute();
>
>
> Sql代码如上,在最终insert后是将test_a_1表的数据插入了两遍,而test_a_2的数据并没有插入,请问这个是bug吗


Re: Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-04 文章 Jark Wu
1. 对于 upsert-kafka 会默认加上 ChangelogNormalize
2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json
也能用,但是要加上 table.exec.source.cdc-events-duplicate = true
参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如
forward。

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate

On Thu, 4 Mar 2021 at 15:30, Qishang  wrote:

> Hi 社区。
> Flink 1.12.1
>
> 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有
> forword 的ETL没有作用。
>
> insert into table_a select id,udf(a),b,c from table_b;
>
> 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区
> 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置?
> 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗?
>
> ```
> == Physical Execution Plan ==
> Stage 1 : Data Source
> content : Source: TableSourceScan(table=[[default_catalog,
> default_database, temp_table]], fields=[id...])
>
> Stage 3 : Operator
> content : ChangelogNormalize(key=[id])
> ship_strategy : HASH
>
> Stage 4 : Operator
> content : Calc(select=[...])
> ship_strategy : FORWARD
>
> Stage 5 : Data Sink
> content : Sink: Sink(table=[default_catalog.default_database.table_a],
> fields=[id...])
> ship_strategy : FORWARD
> ```
>


Re: [DISCUSS] Deprecation and removal of the legacy SQL planner

2021-03-04 文章 Jark Wu
big +1 from my side.

Best,
Jark

On Thu, 4 Mar 2021 at 20:59, Leonard Xu  wrote:

> +1 for the roadmap.
>
> Thanks Timo for driving this.
>
> Best,
> Leonard
>
> > 在 2021年3月4日,20:40,Timo Walther  写道:
> >
> > Last call for feedback on this topic.
> >
> > It seems everyone agrees to finally complete FLIP-32. Since FLIP-32 has
> been accepted for a very long time, I think we don't need another voting
> thread for executing the last implementation step. Please let me know if
> you think differently.
> >
> > I will start deprecating the affected classes and interfaces beginning
> of next week.
> >
> > Regards,
> > Timo
> >
> >
> > On 26.02.21 15:46, Seth Wiesman wrote:
> >> Strong +1
> >> Having two planners is confusing to users and the diverging semantics
> make
> >> it difficult to provide useful learning material. It is time to rip the
> >> bandage off.
> >> Seth
> >> On Fri, Feb 26, 2021 at 12:54 AM Kurt Young  wrote:
> >>>  breaking
> >>> change.>
> >>>
> >>> Hi Timo,
> >>>
> >>> First of all I want to thank you for introducing this planner design
> back
> >>> in 1.9, this is a great work
> >>> that allows lots of blink features to be merged to Flink in a
> reasonably
> >>> short time. It greatly
> >>> accelerates the evolution speed of Table & SQL.
> >>>
> >>> Everything comes with a cost, as you said, right now we are facing the
> >>> overhead of maintaining
> >>> two planners and it causes bugs and also increases imbalance between
> these
> >>> two planners. As
> >>> a developer and also for the good of all Table & SQL users, I also
> think
> >>> it's better for us to be more
> >>> focused on a single planner.
> >>>
> >>> Your proposed roadmap looks good to me, +1 from my side and thanks
> >>> again for all your efforts!
> >>>
> >>> Best,
> >>> Kurt
> >>>
> >>>
> >>> On Thu, Feb 25, 2021 at 5:01 PM Timo Walther 
> wrote:
> >>>
>  Hi everyone,
> 
>  since Flink 1.9 we have supported two SQL planners. Most of the
> original
>  plan of FLIP-32 [1] has been implemented. The Blink code merge has
> been
>  completed and many additional features have been added exclusively to
>  the new planner. The new planner is now in a much better shape than
> the
>  legacy one.
> 
>  In order to avoid user confusion, reduce duplicate code, and improve
>  maintainability and testing times of the Flink project as a whole we
>  would like to propose the following steps to complete FLIP-32:
> 
>  In Flink 1.13:
>  - Deprecate the `flink-table-planner` module
>  - Deprecate `BatchTableEnvironment` for both Java, Scala, and Python
> 
>  In Flink 1.14:
>  - Drop `flink-table-planner` early
>  - Drop many deprecated interfaces and API on demand
>  - Rename `flink-table-planner-blink` to `flink-table-planner`
>  - Rename `flink-table-runtime-blink` to `flink-table-runtime`
>  - Remove references of "Blink" in the code base
> 
>  This will have an impact on users that still use DataSet API together
>  with Table API. With this change we will not support converting
> between
>  DataSet API and Table API anymore. We hope to compensate the missing
>  functionality in the new unified TableEnvironment and/or the batch
> mode
>  in DataStream API during 1.14 and 1.15. For this, we are looking for
>  further feedback which features are required in Table API/DataStream
> API
>  to have a smooth migration path.
> 
>  Looking forward to your feedback.
> 
>  Regards,
>  Timo
> 
>  [1]
> 
> 
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions
> 
> >>>
> >
>
>


Re: [DISCUSS] Correct time-related function behavior in Flink SQL

2021-01-20 文章 Jark Wu
Great examples to understand the problem and the proposed changes, @Kurt!

Thanks Leonard for investigating this problem.
The time-zone problems around time functions and windows have bothered a
lot of users. It's time to fix them!

The return value changes sound reasonable to me, and keeping the return
type unchanged will minimize the surprise to the users.
Besides that, I think it would be better to mention how this affects the
window behaviors, and the interoperability with DataStream.

I think this definitely deserves a FLIP.



Hi zhisheng,

Do you have examples to illustrate which case will get the wrong window
boundaries?
That will help to verify whether the proposed changes can solve your
problem.

Best,
Jark


On Thu, 21 Jan 2021 at 12:54, zhisheng <173855...@qq.com> wrote:

> Thanks to Leonard Xu for discussing this tricky topic. At present, there
> are many Flink jobs in our production environment that are used to count
> day-level reports (eg: count PV/UV ).
>
>
> If use the default Flink SQL, the window time range of the
> statistics is incorrect, then the statistical results will naturally be
> incorrect.
>
>
> The user needs to deal with the time zone manually in order to solve the
> problem.
>
>
> If Flink itself can solve these time zone issues, then I think it will be
> user-friendly.
>
>
> Thank you
>
>
> Best!
> zhisheng
>
>
> --原始邮件--
> 发件人:
>   "dev"
> <
> xbjt...@gmail.com;
> 发送时间:2021年1月19日(星期二) 晚上6:35
> 收件人:"dev"
> 主题:Re: [DISCUSS] Correct time-related function behavior in Flink SQL
>
>
>
> I found above example format may mess up in different mail client, I post
> a picture here[1].
>
> Best,
> Leonard
>
> [1]
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/pictures/CURRRENT_TIMESTAMP.png
> <
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/pictures/CURRRENT_TIMESTAMP.png;
>
>
>  在 2021年1月19日,16:22,Leonard Xu  
>  Hi, all
> 
>  I want to start the discussion about correcting time-related function
> behavior in Flink SQL, this is a tricky topic but I think it’s time to
> address it.
> 
>  Currently some temporal function behaviors are wired to users.
>  1. When users use a PROCTIME() in SQL, the value of PROCTIME()
> has a timezone offset with the wall-clock time in users' local time zone,
> users need to add their local time zone offset manually to get expected
> local timestamp(e.g: Users in Germany need to +1h to get expected local
> timestamp).
> 
>  2. Users can not use
> CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP to get wall-clock
> timestamp in local time zone, and thus they need write UDF in their SQL
> just for implementing a simple filter like WHERE date_col =
> CURRENT_DATE.
> 
>  3. Another common case is the time window with day
> interval based on PROCTIME(), user plan to put all data from one day into
> the same window, but the window is assigned using timestamp in UTC+0
> timezone rather than the session timezone which leads to the window starts
> with an offset(e.g: Users in China need to add -8h in their business sql
> start and then +8h when output the result, the conversion like a magic for
> users).
> 
>  These problems come from that lots of time-related functions like
> PROCTIME(), NOW(), CURRENT_DATE, CURRENT_TIME and CURRENT_TIMESTAMP are
> returning time values based on UTC+0 time zone.
> 
>  This topic will lead to a comparison of the three types, i.e.
> TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE, TIMESTAMP WITH LOCAL TIME ZONE and
> TIMESTAMP WITH TIME ZONE. In order to better understand the three types, I
> wrote a document[1] to help understand them better. You can also know the
> tree timestamp types behavior in Hadoop ecosystem from the reference link
> int the doc.
> 
> 
>  I Invested all Flink time-related functions current behavior and
> compared with other DB vendors like Pg,Presto, Hive, Spark,
> Snowflake, I made an excel [2] to organize them well, we can use it
> for the next discussion. Please let me know if I missed something.
>  From my investigation, I think we need to correct the behavior of
> function NOW()/PROCTIME()/CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP, to
> correct them, we can change the function return type or function return
> value or change return type and return value both. All of those way are
> valid because SQL:2011 does not specify the function return type and every
> SQL engine vendor has its own implementation. For example the
> CURRENT_TIMESTAMP function,
> 
>  FLINK  current behaviorexisted problem other vendors'
> behavior proposed change
>  CURRENT_TIMESTAMP  CURRENT_TIMESTAMP
>  TIMESTAMP(0) NOT NULL
> 
>  #session timezone: UTC
>  2020-12-28T23:52:52
> 
>  #session timezone: UTC+8
>  2020-12-28T23:52:52
> 
>  wall clock:
>  UTC+8: 

Re: 请问大神们flink-sql可以指定时间清除内存中的全部State吗?

2021-01-13 文章 Jark Wu
为啥不用天级别的tumble window? 自动就帮你清楚 state 了

On Wed, 6 Jan 2021 at 13:53, 徐州州 <25977...@qq.com> wrote:

> 最近在做一个flink实时数仓项目,有个日报的场景,使用flink-on-yarn模式,如果job不停止,第二天的结果就在第一天的基础上累加了,我计算的读取规则(数据TimeStampcurrent_date)就认为是今天的数据,可是如果集群不停止第二天的计算结果就会在第一天累加,代码中只设置了env.setStateBackend(new
> MemoryStateBackend()),目前我是每天重启一下job才可以释放内存中的State避免在昨天的基础上累计。我数据源是connector的upsert-kafka,然后基于dwd层编写sql。下面是我执行的具体sql,其中所用的表都来自dwd层的upsert-kafka数据源。
> |  select
> |   TO_DATE(cast(doi.DeliveryTime as
> String),'-MM-dd') as  days,
> |   doi.UserId,
> |   count(doi.Code) as   SendTime,
> |   sum(doi.PayAmount / 100) as SendCashcharge,
> |   sum(doi.PayAmount / 100 - ChargeAmount / 100 +
> UseBalance / 100) as  SendCashuse,
> |   sum(doi.CashMoney / 100)as  SendCash
> |from dwd_order_info doi
> |where doi.DeliveryTime cast(current_date AS
> TIMESTAMP) and doi.OrderType = 29 and doi.Status = 50 and doi.Status
> < 60
> |group by TO_DATE(cast(doi.DeliveryTime as
> String),'-MM-dd'), doi.UserId


Re: flink的算子没有类似于spark的cache操作吗?

2021-01-13 文章 Jark Wu
社区已经在做了,可以关注下这个 FLIP:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink

On Fri, 8 Jan 2021 at 15:42, 张锴  wrote:

> 保存中间变量可以用状态存
>
> 李继  于2021年1月7日周四 下午5:42写道:
>
> > HI , 请问当一个算子会被多次使用时,怎么把他缓存住,类似于spark的cache操作
> >
> > val env = getBatchEnv
> > val ds = env.fromElements("a","b","c")
> >
> > val ds2 = ds.map(x=>{
> >   println("map op")
> >   x.charAt(0).toInt+1
> > })
> >
> > //此操作会打印三遍map op
> > ds2.print()
> >
> > //此操作又会打印三遍map op
> > ds2.filter(_>100).print()
> >
>


Re: flink sqlsubmit自定义程序报错

2021-01-13 文章 Jark Wu
从报错信息看是超时了,看看client与 JM 之间的网络是否通常把。

On Sun, 10 Jan 2021 at 16:23, Fei Han 
wrote:

> 大家好!
>
> 参考云邪写的sqlsubmit提交SQL文件,我修改后提交,SQL文件已经识别了,可以创建表。但是提交任务insert的时候,在local模式下就报错。
> Flink版本是1.12.0。我的提交命令是:$FLINK_HOME/bin/flink run -mip:8081 -d  -p 3 -c
> sql.submit.SqlSubmit $SQL_JAR  -f $sql_file
> 在local模式报错如下:
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Failed to execute sql
>  at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
>  at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>  at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>  at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
>  at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
>  at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
>  at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>  at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
>  at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:696)
>  at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:759)
>  at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:665)
>  at sql.submit.SqlSubmit.callInsertInto(SqlSubmit.java:169)
>  at sql.submit.SqlSubmit.callCommand(SqlSubmit.java:89)
>  at sql.submit.SqlSubmit.run(SqlSubmit.java:64)
>  at sql.submit.SqlSubmit.main(SqlSubmit.java:35)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
>  ... 11 more
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job '
> behavior'.
>  at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1951)
>  at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
>  at
> org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
>  at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:680)
>  ... 22 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
>  at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:366)
>  at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>  at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>  at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:361)
>  at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  at
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1168)
>  at
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>  at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:549)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at
> 

Re: Row function cannot have column reference through table alias

2021-01-13 文章 Jark Wu
已知问题,后续版本会修复,作为临时解决办法,可以使用直接这样去构造 (b.app_id, b.message),不用添加 ROW 关键字。

On Mon, 11 Jan 2021 at 11:17, 刘海  wrote:

> 使用ROW里面的 表别名.字段名 会出现解析错误,我之前使用hbase也有这个问题,我一般是在查询里面包一层子查询
>
>
> | |
> 刘海
> |
> |
> liuha...@163.com
> |
> 签名由网易邮箱大师定制
> On 1/11/2021 11:04,马阳阳 wrote:
> We have a sql that compose a row with a table’s columns. The simplified
> sql is like:
> INSERT INTO flink_log_sink
> SELECT
> b.id,
> Row(b.app_id, b.message)
> FROM flink_log_source a
> join flink_log_side b
> on a.id = b.id;
>
>
> When we submit the sql to Flink, the sql cannot be parsed, with the
> following error message:
> org.apache.flink.table.api.SqlParserException: SQL parse failed.
> Encountered "." at line 11, column 8.
> Was expecting one of:
> ")" ...
> "," ...
>
> at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
> at
> cn.imdada.bi.dfl2.core.operation.InsertIntoOperation.execute(InsertIntoOperation.java:35)
> at cn.imdada.bi.dfl2.core.Main.execute(Main.java:172)
> at cn.imdada.bi.dfl2.core.Main.main(Main.java:125)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at
> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:153)
> at
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
> at
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:112)
> at
> cn.imdada.bi.dfl2.launcher.yarn.YarnPerJobSubmitter.submit(YarnPerJobSubmitter.java:37)
> at cn.imdada.bi.dfl2.launcher.LauncherMain.main(LauncherMain.java:127)
> Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered
> "." at line 11, column 8.
> Was expecting one of:
> ")" ...
> "," ...
>
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:442)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:205)
> at
> org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
> at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155)
> at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180)
> at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
> ... 15 more
> Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered
> "." at line 11, column 8.
> Was expecting one of:
> ")" ...
> "," ...
>
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:39525)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39336)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:24247)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java:19024)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java:18680)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:18721)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:18652)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectExpression(FlinkSqlParserImpl.java:11656)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectItem(FlinkSqlParserImpl.java:10508)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SelectList(FlinkSqlParserImpl.java:10495)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:7115)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:684)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:18635)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:18089)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:558)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.RichSqlInsert(FlinkSqlParserImpl.java:5709)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3342)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3882)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:253)
> at 

Re: flink sql读kafka元数据问题

2021-01-13 文章 Jark Wu
kafka 读 key fields:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#key-fields

On Wed, 13 Jan 2021 at 15:18, JasonLee <17610775...@163.com> wrote:

> hi
>
> 你写入数据的时候设置 headers 了吗 没设置的话当然是空的了
>
>
>
> -
> Best Wishes
> JasonLee
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re: SQL Client并行度设置 问题

2020-12-31 文章 Jark Wu
目前 SQL CLI 还不支持配置 flink-conf.yaml 中的参数,只支持 table 自己的参数。
你可以用 SET table.exec.resource.default-parallelism = xx 来配置 job 并发。

On Thu, 31 Dec 2020 at 17:13, jiangjiguang719 
wrote:

> 通过设置table.exec.hive.infer-source-parallelism=false 已经可以控制并发度,但是存在以下现象:
> 1、无论在streaming 还是在batch 模式,并发度只能由flink-conf.yaml 中的
> parallelism.default控制,其是全局配置,不能做到单个作业配置并发度
> 2、在sql-client-defaults.yaml中设置 parallelism 无效,在SQL Clint 中设置 
> parallelism.default
> 和 parallelism 都无效
>
> 那么如何有效控制 单个任务的并发度呢?
>
> 在 2020-12-31 15:21:36,"Jark Wu"  写道:
> >在 Batch 模式下:
> >
> >1. Hive source 会去推断并发数,并发数由文件数决定。你也可以通过
> >table.exec.hive.infer-source-parallelism=false 来禁止并发推断,
> > 这时候就会用 job 并发。或者设置一个最大的推断并发数
> >table.exec.hive.infer-source-parallelism.max。[1]
> >2. 同上。
> >3. 这里跟 max-parallelism 应该没有关系,应该是你没有配置 max slot 的原因,source 申请的并发太多,而 yarn
> >一时半会儿没这么多资源,所以超时了。
> >   配上 slotmanager.number-of-slots.max 就可以防止 batch 作业无限制地去申请资源。
> >
> >Best,
> >Jark
> >
> >[1]:
> >https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/hive/hive_read_write.html#source-parallelism-inference
> >[2]:
> >https://ci.apache.org/projects/flink/flink-docs-master/deployment/config.html#slotmanager-number-of-slots-max
> >
> >On Thu, 31 Dec 2020 at 14:56, jiangjiguang719 
> >wrote:
> >
> >> flink1.12版本,使用SQL Client提交任务,读hive表,对并行度有些疑问,以下是现象:
> >> flink-conf.yaml文件中的:
> >>   taskmanager.numberOfTaskSlots: 1   有效
> >>   parallelism.default: 1无效,实际任务的并行度=hive表的文件 且 <= 160
> >> sql-client-defaults.yaml 文件中的:
> >>   execution:
> >> parallelism: 10无效
> >> max-parallelism: 16 当hive表的文件数大于此值时,报资源不足  Deployment took more
> >> than 60 seconds. Please check if the requested resources are available in
> >> the YARN cluster
> >> 问题:
> >> 1、SQL Client提交任务 怎么设置并行度?
> >> 2、为啥parallelism参数是无效的?
> >> 3、当hive表文件数大于max-parallelism 时为啥 发布失败?
>
>
>
>
>


Re: SQL Client并行度设置 问题

2020-12-30 文章 Jark Wu
在 Batch 模式下:

1. Hive source 会去推断并发数,并发数由文件数决定。你也可以通过
table.exec.hive.infer-source-parallelism=false 来禁止并发推断,
 这时候就会用 job 并发。或者设置一个最大的推断并发数
table.exec.hive.infer-source-parallelism.max。[1]
2. 同上。
3. 这里跟 max-parallelism 应该没有关系,应该是你没有配置 max slot 的原因,source 申请的并发太多,而 yarn
一时半会儿没这么多资源,所以超时了。
   配上 slotmanager.number-of-slots.max 就可以防止 batch 作业无限制地去申请资源。

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/hive/hive_read_write.html#source-parallelism-inference
[2]:
https://ci.apache.org/projects/flink/flink-docs-master/deployment/config.html#slotmanager-number-of-slots-max

On Thu, 31 Dec 2020 at 14:56, jiangjiguang719 
wrote:

> flink1.12版本,使用SQL Client提交任务,读hive表,对并行度有些疑问,以下是现象:
> flink-conf.yaml文件中的:
>   taskmanager.numberOfTaskSlots: 1   有效
>   parallelism.default: 1无效,实际任务的并行度=hive表的文件 且 <= 160
> sql-client-defaults.yaml 文件中的:
>   execution:
> parallelism: 10无效
> max-parallelism: 16 当hive表的文件数大于此值时,报资源不足  Deployment took more
> than 60 seconds. Please check if the requested resources are available in
> the YARN cluster
> 问题:
> 1、SQL Client提交任务 怎么设置并行度?
> 2、为啥parallelism参数是无效的?
> 3、当hive表文件数大于max-parallelism 时为啥 发布失败?


Re: Flink SQL 1.11支持将数据写入到Hive吗?

2020-12-11 文章 Jark Wu
1.11的文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_read_write.html

1.12的文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/

On Fri, 11 Dec 2020 at 15:45, yinghua...@163.com  wrote:

> 看官网介绍是支持的:
>
> 但是找对应的连接器是没有Hive,*是JDBC?*
>
>


Re: computed column转为timestamp类型后进行窗口聚合报错

2020-12-11 文章 Jark Wu
建议将完整的代码展示出来,现在的信息不足以分析问题。

On Fri, 11 Dec 2020 at 11:53, jun su  wrote:

> hi Danny,
>  尝试过是一样报错,debug看了下是LogicalWindowAggregateRuleBase在构建window时没有将Expr信息带下去
> , 只带了别名,导致后续优化规则报错退出
>
> Danny Chan  于2020年12月11日周五 上午11:47写道:
>
> > 有木有尝试补充 watermark 语法
> >
> > jun su  于2020年12月11日周五 上午10:47写道:
> >
> > > hi all,
> > >
> > > flink 1.11.0版本, 使用computed column将long类型字段转为timestamp后进行时间窗口聚合,报如下错误:
> > >
> > > ddl:
> > >
> > > CREATE TABLE source(
> > > occur_time BIGINT,
> > > rowtime AS longToTimestamp(occur_time)
> > > ) WITH ('connector' = 'filesystem','format' = 'orc','path' =
> > > '/path/to/data')
> > >
> > > 报错信息:
> > >
> > > Caused by: java.lang.IllegalArgumentException: field [$f0] not found;
> > input
> > > fields are: [occur_time]
> > > at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:402)
> > > at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:385)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.timeFieldIndex(AggregateUtil.scala:720)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.transformTimeSlidingWindow(BatchExecWindowAggregateRule.scala:161)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.onMatch(BatchExecWindowAggregateRule.scala:111)
> > > at
> > >
> > >
> >
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217)
> > > ... 27 more
> > >
> > > --
> > > Best,
> > > Jun Su
> > >
> >
>
>
> --
> Best,
> Jun Su
>


Re: Re: Re: retract stream UDAF使用问题

2020-12-09 文章 Jark Wu
你可以把 upsert kafka 想象成是 mysql 表的实时物化视图,
你在 mysql 里面 code 是 key,amount 是 value。当你把 amount 从0 更新成 100, 200。
那么最后的 sum(amount) 结果自然是 200。

如果你想要 0 -> 100 -> 300, 说明你不想把这个数据看成是有 pk 更新的数据,而是一条条独立的数据,这个时候你声明成 kafka
connector,不定义 pk 即可,也就是当成普通 log 处理了。

关于你的 UDAF 的问题,估计是你实现的问题,因为你在 retract 方法中又把值设回 previous 值了。

Best,
Jark



On Thu, 10 Dec 2020 at 15:04, bulterman <15618338...@163.com> wrote:

> 假设Code X,第一条数据X.Amount=0,第二条数据X.Amount=100,第三条数据X.Amount=200
> 1、由于Code是主键,table中每次仅保留了第最新那条X的数据,因此select sum(X.Amount) from table的输出是
> :0, 100, 200
> 2、我定义UDAF中,对于同一个Code X来说,在accumulate方法中每次都会执行acc.lastAmount =
> Amount去更新acc的状态,但从结果来看,对于同一个Code X,每一次进入方法acc.lastAmount都是0?
> 也是因为表中仅保留一条Code X的数据的关系吗?
>
>
> 那在upsert kafka table中(Code X只保留最新一条数据),假设要累加Code
> X的Amount,期望的输出是:0,100,300...,应该如何实现?
> 求大佬解惑><
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-12-10 13:47:57,"Jark Wu"  写道:
> >因为你的 upsert kafka table 的 pk 是 code,所以 code 分组下,数据已经是唯一的了 (一个 code
> >下,只有一行,取最后一行作为最新数据)。估计你同样的 code 下,amount 值是一样的,所以 sum(amount) 自然不会变化。
> >
> >Best,
> >Jark
> >
> >On Thu, 10 Dec 2020 at 12:36, bulterman <15618338...@163.com> wrote:
> >
> >> // kafka table
> >> tableEnv.execuetSql("CREATE TABLE  market_stock(\n" +
> >>
> >> "Code STRING,\n" +
> >>
> >> "Amount BIGINT,\n" +
> >>
> >> ..
> >>
> >> "PRIMARY KEY (Code) NOT ENFORCED\n" +
> >>
> >> ") WITH (\n" +
> >>
> >> "'connector' = 'upsert-kafka',\n" +
> >>
> >> "'topic' = 'zzz',\n" +
> >>
> >> "'properties.bootstrap.servers' = '10.0.3.20:9092,
> >> 10.0.3.24:9092,10.0.3.26:9092',\n" +
> >>
> >> "'properties.group.id' = 'sqltest46',\n" +
> >>
> >> "'key.format' = 'raw',\n" +
> >>
> >> "'value.format' = 'json'\n" +
> >>
> >> ")");
> >> // 使用UDAF计算
> >> Table table = bsTableEnv.sqlQuery("select
> >> Code,MainFundFlowFunc(Amount,AskPrice1,BidPrice1,Last) from market_stock
> >> GROUP BY Code");
> >> env.toRetractStream(table,Row.class).print();
> >>
> >>
> >> // UDAF的定义如下
> >> public class MainFundFlowFunc extends AggregateFunction AmountAccum> {
> >> @Override
> >> public Row getValue(AmountAccum acc) {
> >> Long mf = acc.ebb + acc.bb - acc.ebs - acc.bs;
> >> double mfr = acc.lastAmount > 0 ?
> >> MyNumericCalculator.divide(mf,acc.lastAmount,2).doubleValue() : 0.0;
> >> return Row.of(acc.ebs,acc.bs,acc.ms,acc.ss,acc.ebb,acc.bb
> ,acc.mb,
> >> acc.sb,mf,mfr);
> >> }
> >> @Override
> >> public AmountAccum createAccumulator() {
> >> return new AmountAccum();
> >> }
> >>
> >> public void accumulate(AmountAccum acc, Long amount, Double
> askPrice1,
> >> Double bidPrice1, Double last) {
> >> //..
> >>acc.lastAmount = amount;
> >> acc.lastAskPrice1 = askPrice1;
> >> acc.lastBidPrice1 = bidPrice1;
> >> }
> >> public void retract(AmountAccum acc, Long amount, Double askPrice1,
> >> Double bidPrice1, Double last) {
> >> acc.lastAmount = amount;
> >> acc.lastAskPrice1 = askPrice1;
> >> acc.lastBidPrice1 = bidPrice1;
> >> }
> >>
> >> }
> >>
> >>
> >>
> >>
> >> // acc
> >> public class AmountAccum {
> >> public Double lastAskPrice1;
> >> public Double lastBidPrice1;
> >>
> >> public Long lastAmount = 0L;
> >>
> >> public Long ebs = 0L;
> >>
> >> public Long bs = 0L;
> >>
> >> public Long ms = 0L;
> >>
> >> public Long ss = 0L;
> >>
> >> public Long ebb = 0L;
> >>
> >> public Long bb = 0L;
> >>
> >> public Long mb = 0L;
> >>
> >> public Long sb = 0L;
> >> }
> >>
> >>
> >> debug观察acc的lastAmount值,一直是0.
> >>
> >>
> >> 刚才试了一下用sum()函数,执行select Code,sum(Amount) from market_stock GROUP BY
> >> Code,发现并没有累加Amount字段的值,每一次输出都是最新的那个Amount值。
> >> 是我的使用姿势不对吗= =
> >>
> >> 在 2020-12-10 11:30:31,"Jark Wu"  写道:
> >> >可以发下代码吗?
> >> >
> >> >On Thu, 10 Dec 2020 at 11:19, bulterman <15618338...@163.com> wrote:
> >> >
> >> >> 上游是upsert-kafka connector 创建的table,
> >> >> 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的
> >> >> (为了测试方便,table里只有同一个PK的数据)
> >>
>


Re: Re: retract stream UDAF使用问题

2020-12-09 文章 Jark Wu
因为你的 upsert kafka table 的 pk 是 code,所以 code 分组下,数据已经是唯一的了 (一个 code
下,只有一行,取最后一行作为最新数据)。估计你同样的 code 下,amount 值是一样的,所以 sum(amount) 自然不会变化。

Best,
Jark

On Thu, 10 Dec 2020 at 12:36, bulterman <15618338...@163.com> wrote:

> // kafka table
> tableEnv.execuetSql("CREATE TABLE  market_stock(\n" +
>
> "Code STRING,\n" +
>
> "Amount BIGINT,\n" +
>
> ..
>
> "PRIMARY KEY (Code) NOT ENFORCED\n" +
>
> ") WITH (\n" +
>
> "'connector' = 'upsert-kafka',\n" +
>
> "'topic' = 'zzz',\n" +
>
> "'properties.bootstrap.servers' = '10.0.3.20:9092,
> 10.0.3.24:9092,10.0.3.26:9092',\n" +
>
> "'properties.group.id' = 'sqltest46',\n" +
>
> "'key.format' = 'raw',\n" +
>
> "'value.format' = 'json'\n" +
>
> ")");
> // 使用UDAF计算
> Table table = bsTableEnv.sqlQuery("select
> Code,MainFundFlowFunc(Amount,AskPrice1,BidPrice1,Last) from market_stock
> GROUP BY Code");
> env.toRetractStream(table,Row.class).print();
>
>
> // UDAF的定义如下
> public class MainFundFlowFunc extends AggregateFunction {
> @Override
> public Row getValue(AmountAccum acc) {
> Long mf = acc.ebb + acc.bb - acc.ebs - acc.bs;
> double mfr = acc.lastAmount > 0 ?
> MyNumericCalculator.divide(mf,acc.lastAmount,2).doubleValue() : 0.0;
> return Row.of(acc.ebs,acc.bs,acc.ms,acc.ss,acc.ebb,acc.bb,acc.mb,
> acc.sb,mf,mfr);
> }
> @Override
> public AmountAccum createAccumulator() {
> return new AmountAccum();
> }
>
> public void accumulate(AmountAccum acc, Long amount, Double askPrice1,
> Double bidPrice1, Double last) {
> //..
>acc.lastAmount = amount;
> acc.lastAskPrice1 = askPrice1;
> acc.lastBidPrice1 = bidPrice1;
> }
> public void retract(AmountAccum acc, Long amount, Double askPrice1,
> Double bidPrice1, Double last) {
> acc.lastAmount = amount;
> acc.lastAskPrice1 = askPrice1;
> acc.lastBidPrice1 = bidPrice1;
> }
>
> }
>
>
>
>
> // acc
> public class AmountAccum {
> public Double lastAskPrice1;
> public Double lastBidPrice1;
>
> public Long lastAmount = 0L;
>
> public Long ebs = 0L;
>
> public Long bs = 0L;
>
>     public Long ms = 0L;
>
> public Long ss = 0L;
>
> public Long ebb = 0L;
>
> public Long bb = 0L;
>
> public Long mb = 0L;
>
> public Long sb = 0L;
> }
>
>
> debug观察acc的lastAmount值,一直是0.
>
>
> 刚才试了一下用sum()函数,执行select Code,sum(Amount) from market_stock GROUP BY
> Code,发现并没有累加Amount字段的值,每一次输出都是最新的那个Amount值。
> 是我的使用姿势不对吗= =
>
> 在 2020-12-10 11:30:31,"Jark Wu"  写道:
> >可以发下代码吗?
> >
> >On Thu, 10 Dec 2020 at 11:19, bulterman <15618338...@163.com> wrote:
> >
> >> 上游是upsert-kafka connector 创建的table,
> >> 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的
> >> (为了测试方便,table里只有同一个PK的数据)
>


Re: retract stream UDAF使用问题

2020-12-09 文章 Jark Wu
可以发下代码吗?

On Thu, 10 Dec 2020 at 11:19, bulterman <15618338...@163.com> wrote:

> 上游是upsert-kafka connector 创建的table,
> 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的
> (为了测试方便,table里只有同一个PK的数据)


Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 文章 Jark Wu
Hi Jie,

看起来确实是个问题。
sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。
可以帮忙创建个 issue 么?

Best,
Jark

On Thu, 10 Dec 2020 at 02:05, hailongwang <18868816...@163.com> wrote:

> Hi,
>是的,感觉你是对的。
>   `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而
> `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在 snapshotState
> 时候调用format.flush。
>WDYT @Jark @ Leonard
>
> Best,
> Hailong
>
>
> 在 2020-12-09 17:13:14,"jie mei"  写道:
> >Hi, Community
> >
> >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为
> >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的
> >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。
> >
> >我的问题是:是否有办法强制刷新buffer中的数据入库?
> >
> >
> >@Public
> >public interface OutputFormat extends Serializable {
> >
> >   /**
> >* Configures this output format. Since output formats are
> >instantiated generically and hence parameterless,
> >* this method is the place where the output formats set their
> >basic fields based on configuration values.
> >* 
> >* This method is always called first on a newly instantiated output 
> > format.
> >*
> >* @param parameters The configuration with all parameters.
> >*/
> >   void configure(Configuration parameters);
> >
> >   /**
> >* Opens a parallel instance of the output format to store the
> >result of its parallel instance.
> >* 
> >* When this method is called, the output format it guaranteed to
> >be configured.
> >*
> >* @param taskNumber The number of the parallel instance.
> >* @param numTasks The number of parallel tasks.
> >* @throws IOException Thrown, if the output could not be opened
> >due to an I/O problem.
> >*/
> >   void open(int taskNumber, int numTasks) throws IOException;
> >
> >
> >   /**
> >* Adds a record to the output.
> >* 
> >* When this method is called, the output format it guaranteed to be 
> > opened.
> >*
> >* @param record The records to add to the output.
> >* @throws IOException Thrown, if the records could not be added to
> >to an I/O problem.
> >*/
> >   void writeRecord(IT record) throws IOException;
> >
> >   /**
> >* Method that marks the end of the life-cycle of parallel output
> >instance. Should be used to close
> >* channels and streams and release resources.
> >* After this method returns without an error, the output is
> >assumed to be correct.
> >* 
> >* When this method is called, the output format it guaranteed to be 
> > opened.
> >*
> >* @throws IOException Thrown, if the input could not be closed properly.
> >*/
> >   void close() throws IOException;
> >}
> >
> >
> >--
> >
> >*Best Regards*
> >*Jeremy Mei*
>
>
>
>
>
>


Re: 使用flink sql cli读取postgres-cdc时,Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-12-09 文章 Jark Wu
postgres-cdc 的表只支持读,不支持写。

On Wed, 9 Dec 2020 at 22:49, zhisheng  wrote:

> sql client 也得重启
>
> 王敏超  于2020年12月9日周三 下午4:49写道:
>
> > 在使用standalone模式,并启动sql
> > cli后,报错如下。但是我的lib目录是引入了flink-sql-connector-postgres-cdc-1.1.0.jar,
> > 并且重启过集群。同样方式使用mysql cdc是可以的。
> >
> > Caused by: org.apache.flink.table.api.ValidationException: Could not find
> > any factory for identifier 'postgres-cdc' that implements
> > 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the
> > classpath.
> >
> > Available factory identifiers are:
> >
> > blackhole
> > jdbc
> > kafka
> > print
> > ---
> >
> > 所以我是那里没配置对?
> >
> >
> >
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


Re: A group window expects a time attribute for grouping in a stream environment谢谢

2020-12-09 文章 Jark Wu
链接错了。重发下。

1) 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
<https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html>
2) 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
表。这一行应该执行不成功把。

Best,
Jark

On Thu, 10 Dec 2020 at 11:09, Jark Wu  wrote:

> 1). 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
> https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html
> 2. 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
> 表。这一行应该执行不成功把。
>
> Best,
> Jark
>
> On Wed, 9 Dec 2020 at 15:44, Appleyuchi  wrote:
>
>> 代码是:
>> https://paste.ubuntu.com/p/gVGrj2V7ZF/
>> 报错:
>> A group window expects a time attribute for grouping in a stream
>> environment.
>> 但是代码的数据源中已经有时间属性了.
>> 请问应该怎么修改代码?
>> 谢谢
>>
>>
>>
>>
>>
>>
>>
>
>


Re: A group window expects a time attribute for grouping in a stream environment谢谢

2020-12-09 文章 Jark Wu
1). 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html
2. 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
表。这一行应该执行不成功把。

Best,
Jark

On Wed, 9 Dec 2020 at 15:44, Appleyuchi  wrote:

> 代码是:
> https://paste.ubuntu.com/p/gVGrj2V7ZF/
> 报错:
> A group window expects a time attribute for grouping in a stream
> environment.
> 但是代码的数据源中已经有时间属性了.
> 请问应该怎么修改代码?
> 谢谢
>
>
>
>
>
>
>


Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-09 文章 Jark Wu
嗯 1.12.0 这两天就会发布。

On Wed, 9 Dec 2020 at 14:45, xiao cai  wrote:

> Hi Jark
> sorry,是1.12.0, 我打错了
>
>
>  Original Message
> Sender: Jark Wu
> Recipient: user-zh
> Date: Wednesday, Dec 9, 2020 14:40
> Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型
>
>
> Hi 赵一旦, 这部分 jackson 组件已经自动处理了这部分逻辑。 Hi xiaocai, 你有什么 issue 是需要1.12.1的?
> 1.12.0 这两天即将发布。 Best, Jark On Wed, 9 Dec 2020 at 14:34, xiao cai <
> flin...@163.com> wrote: > 好的,计划下周升级测试下,另:1.12.1计划何时发布呢 > > > Original
> Message > Sender: Jark Wu > Recipient: user-zh<
> user-zh@flink.apache.org> > Date: Tuesday, Dec 8, 2020 13:41 > Subject:
> Re: FlinkSQL如何定义JsonObject数据的字段类型 > > > hailong 说的定义成 STRING 是在1.12 版本上支持的,
> > https://issues.apache.org/jira/browse/FLINK-18002 1.12 >
> 这两天就会发布,如果能升级的话,可以尝试一下。 Best, Jark On Tue, 8 Dec 2020 at 11:56, wxpcc < >
> wxp4...@outlook.com> wrote: > 可以使用字符串的方式,或者自定义 >
> String类型format,内部结构再通过udf去做后续的实现 > > > > -- > Sent from: >
> http://apache-flink.147419.n8.nabble.com/


Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-09 文章 Jark Wu
关于 rocksdb 的性能调优, @Yun Tang  会更清楚。

On Thu, 10 Dec 2020 at 11:04, Jark Wu  wrote:

> 建议大状态还是用 rocksdb,生产上会稳定很多。你说的这个量级感觉相差比较大,可能还没有对 rocksdb 调优导致的。
>
> 你可以参考下这几篇文章尝试调优下 rocksdb:
>
> https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA
> https://mp.weixin.qq.com/s/mjWGWJVQ_zSVOgZqtjjoLw
> https://mp.weixin.qq.com/s/ylqK9_SuPKBKoaKmcdMYPA
> https://mp.weixin.qq.com/s/r0iPPGWceWkT1OeBJjvJGg
>
>
> Best,
> Jark
>
> On Wed, 9 Dec 2020 at 12:19, jindy_liu <286729...@qq.com> wrote:
>
>> 场景上:
>>
>>
>> 目前都是mysql里的带主键的表(亿级别)的join操作后,得到的实时宽表(视图)上做一些规则筛选或计数等,并且场景上状态(join算子)都基上上不设置TTL。
>>
>>
>> 目前mysql中的都是些有主键数据,且量级不会有太大的变化,并且可以预见,可能一年也就增加个200w左右,但表里的数据变更较频繁。所以处理上需要吞吐量较大,延时低。
>> 目前测试了一版本flink
>>
>> sql算子使用Rocksdb做后端发现吞吐与延时都比较大,一条数据变化,可能要10秒中才能生效,但换FsStateBackend时,速度就很快了,性能较好;两者相差10倍多。
>>
>>  所以产生以下想法,不知道可不可行?
>>
>>
>> 1、用FsStateBackend:想堆机器,把任务里的用到的状态都用FsStateBackend放内存来搞(基于表的主键数有限,状态应该也是有限的)。
>> 2、还用rocksdb:尽量把flink的manager内存(rocksdb用的)搞得较大,把读写rocksdb都尽量在内存中进行。
>> 目前对flink sql生成的算子的状态情况不太熟悉,想问题下这两种方法是否可行呢?
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>
>


Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-09 文章 Jark Wu
建议大状态还是用 rocksdb,生产上会稳定很多。你说的这个量级感觉相差比较大,可能还没有对 rocksdb 调优导致的。

你可以参考下这几篇文章尝试调优下 rocksdb:

https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA
https://mp.weixin.qq.com/s/mjWGWJVQ_zSVOgZqtjjoLw
https://mp.weixin.qq.com/s/ylqK9_SuPKBKoaKmcdMYPA
https://mp.weixin.qq.com/s/r0iPPGWceWkT1OeBJjvJGg


Best,
Jark

On Wed, 9 Dec 2020 at 12:19, jindy_liu <286729...@qq.com> wrote:

> 场景上:
>
>
> 目前都是mysql里的带主键的表(亿级别)的join操作后,得到的实时宽表(视图)上做一些规则筛选或计数等,并且场景上状态(join算子)都基上上不设置TTL。
>
>
> 目前mysql中的都是些有主键数据,且量级不会有太大的变化,并且可以预见,可能一年也就增加个200w左右,但表里的数据变更较频繁。所以处理上需要吞吐量较大,延时低。
> 目前测试了一版本flink
>
> sql算子使用Rocksdb做后端发现吞吐与延时都比较大,一条数据变化,可能要10秒中才能生效,但换FsStateBackend时,速度就很快了,性能较好;两者相差10倍多。
>
>  所以产生以下想法,不知道可不可行?
>
>
> 1、用FsStateBackend:想堆机器,把任务里的用到的状态都用FsStateBackend放内存来搞(基于表的主键数有限,状态应该也是有限的)。
> 2、还用rocksdb:尽量把flink的manager内存(rocksdb用的)搞得较大,把读写rocksdb都尽量在内存中进行。
> 目前对flink sql生成的算子的状态情况不太熟悉,想问题下这两种方法是否可行呢?
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql 1.11 kafka cdc与holo sink

2020-12-09 文章 Jark Wu
1. 目前不支持。 已有 issue 跟进支持 https://issues.apache.org/jira/browse/FLINK-20385
2. 配上 canal-json.table.include = 't1'   来过滤表。暂不支持正则过滤。
3. 会

Best,
Jark

On Wed, 9 Dec 2020 at 11:33, 于洋 <1045860...@qq.com> wrote:

> flink sql 1.11 创建kafka source 表 ,kafka数据是canal采集的mysql 信息,'format' =
> 'canal-json', 问题是
> 1,在source表中只能有与msyql对应的schema信息么,(也就是data[{}]里面的字段)能不能获取table,ts这种字段的值?
> 2,对于一个topic中有多张mysql binlog信息的表,kafka source表是如何区分的,依赖于schema的不同吗?
> 3,这种source表,与holo sink 表结合使用,遇到delete类型的数据会在holo中删除该条数据吗?'ignoreDelete' =
> 'false'


Re: flink11 SQL 如何支持双引号字符串

2020-12-09 文章 Jark Wu
跟这个 issue 没有关系。

这个听起来更像是 hive query 兼容的需求? 可以关注下 FLIP-152: Hive Query Syntax Compatibility

https://cwiki.apache.org/confluence/display/FLINK/FLIP-152%3A+Hive+Query+Syntax+Compatibility

Best,
Jark

On Wed, 9 Dec 2020 at 11:13, zhisheng  wrote:

> 是跟这个 Issue 有关吗?https://issues.apache.org/jira/browse/FLINK-20537
>
> 赵一旦  于2020年12月9日周三 上午10:17写道:
>
> > MARK,学习下。等回复。
> >
> > 莫失莫忘  于2020年12月8日周二 下午6:49写道:
> >
> > > 我在迁移hive sql 到 flink引擎。原来的很多 hive sql 中
> > > 字符串都是用双引号表示,例如select * from table1 where column1 =
> > > "word"。我如何在不修改SQL的前提下,使flink SQL 支持 双引号字符串。
> > > ps:我看到flink SQL中字符串都必须用 单引号,例如select * from table1 where column1
> =
> > > 'word' 。如何使 字符串 既可以是单引号 也可以是 双引号呢
> >
>


Re: 关于flink sql往postgres写数据遇到的timestamp问题

2020-12-09 文章 Jark Wu
看报错信息,你并没有 insert into postgres,而是只是 select 了 query,这个会将运行结果显式在 sql client
界面上,而不会插入到 postgres 中。

你的 query 中有 timestamp with local time zone, 而 sql client 的 query运行结果的显式
还不支持这个类型。

这个问题的解决可以关注下这个 issue:https://issues.apache.org/jira/browse/FLINK-17948

Best,
Jark

On Tue, 8 Dec 2020 at 19:32, 李轲  wrote:

> 报错信息:
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Unexpected exception.
> This is a bug. Please consider filing an issue.
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
> Caused by: org.apache.flink.table.api.TableException: Unsupported
> conversion from data type 'TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL'
> (conversion class: java.time.Instant) to type information. Only data types
> that originated from type information fully support a reverse conversion.
> at
> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:259)
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> at
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
> at
> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.convertToRowTypeInfo(LegacyTypeInfoDataTypeConverter.java:329)
> at
> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:237)
> at
> org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.java:49)
> at org.apache.flink.table.api.TableSchema.toRowType(TableSchema.java:271)
> at
> org.apache.flink.table.client.gateway.local.result.CollectStreamResult.(CollectStreamResult.java:71)
> at
> org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.(MaterializedCollectStreamResult.java:101)
> at
> org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.(MaterializedCollectStreamResult.java:129)
> at
> org.apache.flink.table.client.gateway.local.ResultStore.createResult(ResultStore.java:83)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:608)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:465)
> at
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:555)
> at
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:311)
> at java.util.Optional.ifPresent(Optional.java:159)
> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212)
> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-12-08 19:24:43,"李轲"  写道:
> >项目需求要向 postgres 中插入数据,用 catalog 之后,插入数据貌似需要和数据库表定义完全一致,而且没找到只插入部分字段的写法
> >在时间转 TIMESTAMP(6) WITH LOCAL TIME ZONE 时报了错,这个格式是 postgres 中的时间戳定义
> >select cast(localtimestamp as TIMESTAMP(6) WITH LOCAL TIME ZONE);
> >有没有什么转换方法?或者只插入部分数据的方法?
>


Re: 求助如何用flink1.11.2 on yarn集成CDH的hbase2.0版本

2020-12-09 文章 Jark Wu
1. 提示“找不到hbase包” 具体的异常栈是什么呢?
2. 看你的步骤中也没有加 flink hbase connector jar 到 lib 下,这会导致找不到 hbase table factory
3. flink 1.11 版本的时候还没有提供 hbase 2.x  connector jar
4. flink 1.12 版本支持了 hbase 2.x,理论上也兼容 flink 1.11 集群。


所以你可以试下 download
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hbase-2.2_2.11/1.12.0/flink-sql-connector-hbase-2.2_2.11-1.12.0.jar
这个 jar 到 flink/lib 下(这个 jar 已经 shade 了 hbase jar),然后用
HADOOP_CLASSPATH=`hadoop classpath`集成hadoop,应该就能 work。具体可以参考下 1.12 的文档 [1]。

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hbase.html





On Tue, 8 Dec 2020 at 17:40, site  wrote:

> 根据官方提供的方法,用HADOOP_CLASSPATH=`hadoop classpath`集成hadoop成功。
> 因为flink on yarn是用的cdh6集群,所以我想利用现有的classpath中的包含的hbase库,使用
>
> export
> HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/lib/*:$HADOOP_CLASSPATH
>
>
> 然后用yarn-session创建一个flink运行环境,再用sql-client连接这个容器,创建hbase映射表,这种用法失败:分析提示是找不到hbase包。
>
>
>
>
> ./bin/yarn-session.sh -d -s 4 -nm common-flink -jm 1024m -tm 4096m
>
> ./bin/sql-client.sh embedded -e conf/sql-env.yaml
>
>
>
>
> sql-env.yaml
>
> configuration:
>
>   execution.target: yarn-session
>
>
>
>
> 再将hbase包复制到flink_home/lib这种方式,结果一下就掉到了深深的坑里:
>
> 尝试1.ClassNotFoundException: org.apache.hadoop.hbase.client.HTable
>
> 尝试2.ClassNotFoundException:
> org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListeningExecutorService
>
> 尝试3.ClassNotFoundException:
> org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos$MasterService$BlockingInterface
>
> 尝试4.复制hbase-shaded-client-2.1.0-cdh6.3.0.jar到lib,类冲突整个yarn-session都无法启动容器
>
> 尝试5\6\7.同3
>
> 尝试8\9.ClassNotFoundException:
> org.apache.hbase.thirdparty.com.google.protobuf.RpcController
>
> 尝试9.ClassNotFoundException:
> org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup
>
> 尝试10.NoSuchMethodError:
> org.apache.hadoop.hbase.client.HTable.getTableName()[B
>
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:272)
>
>
>
>
> 直到尝试9通过复制jar包到lib下都解决了,现在lib包下的hbase依赖包有:
>
> hbase-client-2.1.0-cdh6.3.0.jar
>
> hbase-common-2.1.0-cdh6.3.0.jar
>
> hbase-protocol-2.1.0-cdh6.3.0.jar
>
> hbase-protocol-shaded-2.1.0-cdh6.3.0.jar
>
> hbase-shaded-miscellaneous-2.2.1.jar
>
> hbase-shaded-netty-2.2.1.jar
>
> hbase-shaded-protobuf-2.2.1.jar
>
>
>
>
> 直到尝试10时解决方法除了修改源代码,想问还有什么解决方法没有?或者有什么好的方法集成hbase?


Re: [flink-1.10.2] 异步IO结果DataStream 该如何注册为table??

2020-12-09 文章 Jark Wu
> tabEnv.createTemporaryView("test_table", result,

我看你这不是注册进去了么? 有报什么错么?

最后提交作业执行记得调用 StreamExecutionEnvironment.execute()

Best,
Jark

On Tue, 8 Dec 2020 at 14:54, Tianwang Li  wrote:

> Flink版本:1.10.2
>
> 使用RichAsyncFunction 异步IO 操作,结果DataStream 不能注册为table。
>
> 本地测试的结果是一直重复输出数据。
>
> 请问一下DataStream 处理之后,怎么才能注册为 Table。
>
> ---
> 代码如下:
>
> // 异步redis处理
> RedisAsyncFunction asyncFunction = new RedisAsyncFunction(node,
> aggProcessorArgs);
>
> // 获取异步处理流
> DataStream result = AsyncDataStream.orderedWait(
> dataStream,
> asyncFunction,
> 60L,
> TimeUnit.SECONDS,
> 100).returns(outRowTypeInfo);
>
> // 注册为临时 table
> tabEnv.createTemporaryView("test_table", result,
> outRowFields.stream().collect(Collectors.joining(",")));
>
> //
> result.print("out_table>>");
>
> Table test_table = tabEnv.sqlQuery("select * from test_table");
> // 查询临时table
> tabEnv.toAppendStream(test_table, Row.class).print("test_table");
>
>
>
> --
> **
>  tili
> **
>


Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-08 文章 Jark Wu
Hi 赵一旦,

这部分 jackson 组件已经自动处理了这部分逻辑。

Hi xiaocai,

 你有什么 issue 是需要1.12.1的? 1.12.0 这两天即将发布。

Best,
Jark


On Wed, 9 Dec 2020 at 14:34, xiao cai  wrote:

> 好的,计划下周升级测试下,另:1.12.1计划何时发布呢
>
>
>  Original Message
> Sender: Jark Wu
> Recipient: user-zh
> Date: Tuesday, Dec 8, 2020 13:41
> Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型
>
>
> hailong 说的定义成 STRING 是在1.12 版本上支持的,
> https://issues.apache.org/jira/browse/FLINK-18002 1.12
> 这两天就会发布,如果能升级的话,可以尝试一下。 Best, Jark On Tue, 8 Dec 2020 at 11:56, wxpcc <
> wxp4...@outlook.com> wrote: > 可以使用字符串的方式,或者自定义
> String类型format,内部结构再通过udf去做后续的实现 > > > > -- > Sent from:
> http://apache-flink.147419.n8.nabble.com/


Re: 回复: flink 使用关系型数据库的默认事务是否可以做到端对端的精确一次,还是需要实现2p2提交

2020-12-07 文章 Jark Wu
数据库两阶段提交,保证 exactly once 语义,社区正在支持,感兴趣的可以在
https://issues.apache.org/jira/browse/FLINK-15578 下面讨论。

Best,
Jark

On Tue, 8 Dec 2020 at 09:14, hdxg1101300...@163.com 
wrote:

>
>
>
>
> hdxg1101300...@163.com
>
> 发件人: hdxg1101300...@163.com
> 发送时间: 2020-12-07 18:40
> 收件人: user-zh
> 主题: 回复: Re: flink 使用关系型数据库的默认事务是否可以做到端对端的精确一次,还是需要实现2p2提交
> 你的意思是 自己实现sink 提交的过程中抛出所有异常并且rollback,是可以做到端对端精确一次的;我个人认为这样可以
> 想和别人交流一下。奈何没有人,就想在社区里问问
>
>
>
> hdxg1101300...@163.com
>
> 发件人: Leonard Xu
> 发送时间: 2020-12-07 17:00
> 收件人: user-zh
> 主题: Re: flink 使用关系型数据库的默认事务是否可以做到端对端的精确一次,还是需要实现2p2提交
> Hi,
>
> > 在 2020年12月7日,16:46,hdxg1101300...@163.com 写道:
> >
> >flink 使用关系型数据库的默认事务是否可以做到端对端的精确一次,还是需要实现2p2提交;
> >自己实现sink开启数据库事务,遇到错误回滚并抛出异常,是否可以实现数据精确一次
>
> Flink
> 写入关系型数据库是可以做到端到端的一致性的,默认是不支持的,需要实现两阶段提交,按照你的思路是可行的。另外社区也有人在做这个feature[1],已经有PR了,你可以参考,预计会在1.13里支持。
>
> 祝好,
> Leonard
> [1] https://issues.apache.org/jira/browse/FLINK-15578 <
> https://issues.apache.org/jira/browse/FLINK-15578>
>


Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-07 文章 Jark Wu
hailong 说的定义成 STRING 是在1.12 版本上支持的,
https://issues.apache.org/jira/browse/FLINK-18002

1.12 这两天就会发布,如果能升级的话,可以尝试一下。

Best,
Jark

On Tue, 8 Dec 2020 at 11:56, wxpcc  wrote:

> 可以使用字符串的方式,或者自定义 String类型format,内部结构再通过udf去做后续的实现
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 使用RedisSink无法将读取的Kafka数据写入Redis中

2020-12-06 文章 Jark Wu
这个估计和网络和部署有关,建议咨询下华为云的技术支持。

On Sun, 6 Dec 2020 at 20:40, 赵一旦  wrote:

> 连接不上,你的华为云确认和redis服务器连通吗?
>
> 追梦的废柴  于2020年12月6日周日 下午8:35写道:
>
> > 各位:
> > 晚上好!
> > 现在我所在的项目组在调研Flink框架,有一个指标需要读取Kafka中的数据然后使用Redis存储最终的结果。
> >
> >
> 我们在pom文件中引入了flink-redis的connector,然后按照官方的RedisSink案例,在本地开发的时候可以正常写入到某台服务器上的Redis中,
> > 但是当我把程序打成Jar包之后,部署到服务器(华为云MRS)上使用flink
> > run提交到yarn之后总是在报错,无法写入到Redis中,各位知道是为什么吗?
> > 问题已经卡了我两天了,一点进展都没有,有劳各位帮忙解答一下,Thank you!
> > 报错如下:
> > redis.client.jedis.exceptions.JedisConnectionException:Could not get a
> > resource from the pool at .
> >
> >
>


Re: 动态表 Change Log 格式

2020-12-04 文章 Jark Wu
是完整的记录。

upsert kafka 就是这样子实现的,只存储最新镜像。
但是有的 query 是会产生 delete 消息的,所以有时候还是需要存下 delete,像 upsert kafka 里就存成了kafka 的
tombstone 消息。

Best,
Jark

On Fri, 4 Dec 2020 at 17:00, jie mei  wrote:

> Hi, Community
>
> Flink 动态表的 Change Log 会有四种消息(INSERT, UPDATE_BEFORE, UPDATE_AFTER,
> DELETE).
> 其中 UPDATE_BEFORE 对应的是更新之前的完整记录,UPDATE_AFTER 对应的是更新之后的完整记录吗?
> 我的问题特意强调完整记录,是为了确认 Change Log 的内容是更新后的完整数据,而不是 Set A = 'a' 这样的操作日志/更新语句。
> 此外,Delete语句对应的数据是完整记录还是操作日志呢?
>
> 这意味着Table Sink的时候,只需要获得INSERT, UPDATE_AFTER的数据,写入不支持UPSERT的存储。
> 并通过额外的逻辑判断来获得最新的数据是可行的。
>
> --
>
> *Best Regards*
> *Jeremy Mei*
>


Re: 为什么要关闭calcite的隐式转换功能

2020-12-04 文章 Jark Wu
社区已经开始 Hive query 语法兼容的设计讨论,可以关注下:

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-152-Hive-Query-Syntax-Compatibility-td46928.html

Best,
Jark

On Fri, 4 Dec 2020 at 15:37, stgztsw  wrote:

> 我觉得既然社区准备兼容hive,隐式转换和其他hive的语法兼容还是必须的。实际生产环境里运行的hive
> sql往往都是很复杂的,目前按flink对于hive的兼容程度,大部分的hivesql基本都无法运行成功。(其他欠缺的还有不支持bangEquel,
> create table as 等等,这边就不一一列举了),希望社区能够对hive这块支持的更完善一点。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 生产hive sql 迁移flink 11 引擎,碰到的问题

2020-12-04 文章 Jark Wu
Hi,

Flink SQL 1.11 暂时还不兼容 Hive SQL 语法。这个功能的设计,最近才在社区中讨论,预计1.13中支持。可以关注下这个
design 的讨论:

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-152-Hive-Query-Syntax-Compatibility-td46928.html


Best,
Jark

On Fri, 4 Dec 2020 at 11:45, 莫失莫忘  wrote:

> 最近尝试把一个生产 hive sql 任务,执行引擎切换成 flink 1.11.2 ,发现flink 11 对hive
> SQL的支持有下列问题1、不支持 双引号 表示字符串
> 2、不支持 != 表示不等运算
> 3、不支持 类型隐式转换
> 4、不支持 split 函数
> 5、hive 不区分大小写,flink区分大小写
> 6、join右表 不支持是一个子查询(Calcite bug
> https://issues.apache.org/jira/browse/CALCITE-2152)
> 7、不支持 create table table1 as select * from pokes; 中的 as
>
>
>
> 暂时只测到这些问题。总体感觉flink11 对 hive SQL的语句支持还不够,无法把已有离线 hive sql 任务直接 切换到flink 引擎。


Re: flink-cdc 无法读出binlog,程序也不报错

2020-12-04 文章 Jark Wu
这个听起来不太合理。总得报个什么错 作业再失败吧。 或者TaskManager 的日志中有没有什么异常信息?

On Fri, 4 Dec 2020 at 09:23, chenjb  wrote:

>
> 谢谢老哥关注,我是int对应成bigint了,原以为bigint范围更大应该可以兼容,没认真看文档,然后mysql有错误日志但程序没报错,注意力就放mysql那边了。这两天试了下flink-cdc-sql发现有些有错误的场景不报错,比如连不上mysql(密码错误)也是程序直接退出exit
> 0,没任何报错或提示,是还要设置什么吗?我是java小白,这块不是很懂
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql实时计算分位数如何实现

2020-12-03 文章 Jark Wu
可以看下UDAF的文档:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#aggregate-functions




On Thu, 3 Dec 2020 at 12:06, 爱成绕指柔 <1194803...@qq.com> wrote:

> 你好:
>   目前flink sql实时计算中没有percentile函数吗?如果没有,如何实现这一功能。
>   期待你的答复,谢谢!


Re: 帮忙推荐下flink是用啥可视化的客户端?

2020-12-03 文章 Jark Wu
可以尝试下 Zeppelin, 与 flink sql 的集成做的挺好的。

Best,
Jark

On Thu, 3 Dec 2020 at 21:55, yinghua...@163.com  wrote:

> 这个我没说清楚,就是flink sql客户端,我们想弄个客户端给公司其他部门使用,那些部门同事只会一些sql,对于代码编程欠缺
>
> > 在 2020年12月3日,21:52,Shawn Huang  写道:
> >
> > 你说的客户端是指什么?Flink 默认在 8081 端口提供了 Web UI,可以提交和取消任务,查看日志和一些基础指标。
> >
> > Best,
> > Shawn Huang
> >
> >
> > yinghua...@163.com  于2020年12月3日周四 下午8:46写道:
> >
> >>
>


Re: flink-cdc 无法读出binlog,程序也不报错

2020-12-03 文章 Jark Wu
是不是 unsigned int 惹的祸...

On Thu, 3 Dec 2020 at 15:15, chenjb  wrote:

> 破案了,字段类型没按官网的要求对应起来,对应起来后正常了
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink SQL共享source 问题

2020-12-03 文章 Jark Wu
1. 是不是共享了 source,看下 web ui 中的拓扑图就知道了
2. 追数据的时候,或者下游消费速度不一的时候,分区之间消费不均衡是很正常的。
3. 你可以调大 sink 的并发,以及增加 buffer size 来缓解这个问题。

Best,
Jark

On Wed, 2 Dec 2020 at 19:22, zz  wrote:

> hi各位:
> 目前我有一个任务,source table是读取一个topic生成的,但是有6个sink,使用了多条insert
> 语句输出到同一张mysql表中,按照我的理解,这些insert语句
> 应该都是共享这个source table的,读取kafka只需要读取一次,但是在运行过程中发现kafka
> topic有的分区消费的很快有的分区很慢,请问一下可能是什么原因呢?
> topic一共是18个分区,任务是18个并行度


Re: I defined a Kafka dynamic table in SQL-Client, but the kafka theme had some elements in the wrong format, so an exception was thrown in SQL-Client. Can we define the Kafka dynamic table with some

2020-12-03 文章 Jark Wu
我觉得这应该是个 bug,已创建 issue: https://issues.apache.org/jira/browse/FLINK-20470

On Wed, 2 Dec 2020 at 18:02, mr.meng...@ouglook.com 
wrote:

> <
> http://apache-flink.147419.n8.nabble.com/file/t1146/QQ%E6%88%AA%E5%9B%BE111.jpg>
>
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t1146/%E6%97%A0%E6%A0%87%E9%A2%981211.png
> >
> Caused by: java.io.IOException: Failed to deserialize JSON ''.
> at
>
> org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:126)
> ~[flink-json-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:76)
> ~[flink-json-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
> ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> Caused by: java.lang.ClassCastException:
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.MissingNode
> cannot be cast to
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
>
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 为什么要关闭calcite的隐式转换功能

2020-12-03 文章 Jark Wu
隐式转换功能,是一个非常重要的 public API ,需要经过社区仔细地讨论,例如哪些类型之间可以类型转换。
目前社区还没有规划这个功能,如果需要的话,可以在社区中开个 issue。

Best,
Jark

On Wed, 2 Dec 2020 at 18:33, stgztsw  wrote:

> 目前flink sql,flink hive
> sql都不支持隐式转换功能。我们在调试的时候发现其实calcite本身是支持的。但是flink这边强制关闭了。而hive本身是支持隐式转换的。这导致我们的hive任务无法迁移到flink上执行。请问关闭的原因是什么?如果我们这边开启会带来什么问题吗?


Re: flink sql 1.11.1 貌似出现bug

2020-12-03 文章 Jark Wu
看样子是提交作业超时失败了,请确认
1. flink cluster 已经起来了
2. sql client 的环境与 flink cluster 环境连通
3. sql-client-defaults.yaml 中配置了正确的 gateway-address 地址 (如果是本地 cluster,则不用配置)

Best,
Jark

On Wed, 2 Dec 2020 at 14:12, zzy  wrote:

> 遇到的问题如下, flink版本1.11.1,sql client 中使用flink sql
>
>
> sql语句如下:
> CREATE TABLE sls_log_sz_itsp (
>   request STRING,
>   http_bundleId STRING,
>   upstream_addr STRING,
>   http_appid STRING,
>   bodyUserId STRING,
>   http_sequence STRING,
>   http_version STRING,
>   response_body STRING,
>   uri STRING,
>   bytes_sent STRING,
>   http_userId STRING,
>   http_cityId STRING,
>   http_user_agent STRING,
>   http_deviceType STRING,
>   record_time STRING,
>   rt AS TO_TIMESTAMP(DATE_FORMAT(record_time,'-MM-dd HH:mm:ss')),
>   WATERMARK FOR rt AS rt - INTERVAL '5' SECOND,
>   request_time STRING,
>   request_body STRING,
>   request_length STRING,
>   nginx_id STRING,
>   proxy_add_x_forwarded_for STRING,
>   http_deviceId STRING,
>   host STRING,
>   upstream_response_time STRING,
>   status STRING
> ) WITH (
>  'connector.type' = 'kafka',
>  'connector.version' = '0.11',
>  'connector.topic' = 'sls',
>  'connector.properties.zookeeper.connect' =
> 'hadoop85:2181,hadoop86:2181,hadoop87:2181',
>  'connector.properties.bootstrap.servers' =
> 'hadoop85:9092,hadoop86:9092,hadoop87:9092',
>  'connector.properties.group.id' = 'log-sz-itsp',
>  'connector.startup-mode' = 'latest-offset',
>  'format.type' = 'json'
> );
>
>
>
>  CREATE TABLE sz_itsp_test(
> request STRING,
> request_count BIGINT NOT NULL,
> window_end TIMESTAMP(3)
> ) WITH (
> 'connector.type' = 'jdbc',
> 'connector.url' =
> 'jdbc:mysql://hadoop85:3306/test?useSSL=false=true',
> 'connector.table' = 'sz_itsp_test',
> 'connector.driver' = 'com.mysql.jdbc.Driver',
> 'connector.username' = 'root',
> 'connector.password' = '00',
> 'connector.write.flush.max-rows' = '1',
> 'connector.write.flush.interval' = '2s',
> 'connector.write.max-retries' = '3'
> );
>
>
> INSERT INTO sz_itsp_test
> SELECT
>request,
>count(request) request_count,
>TUMBLE_END(rt, INTERVAL '5' MINUTE) AS window_end
>  FROM sls_log_sz_itsp
>  WHERE nginx_id = 'sz-itsp' AND nginx_id IS NOT NULL
>  GROUP BY TUMBLE(rt, INTERVAL '5' MINUTE), request
>  ;
>
>
> sql client使用中出现如下报错:
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Unexpected exception.
> This is a bug. Please consider filing an issue.
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190)
> Caused by: java.lang.RuntimeException: Error running SQL job.
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:608)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:529)
> at
> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:537)
> at
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299)
> at java.util.Optional.ifPresent(Optional.java:159)
> at
> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200)
> at
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
> at
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> JobGraph.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:605)
> ... 8 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
> at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> 

Re: Flink TableAPI Issue: cannot assign instance of org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.LRUMap to field

2020-12-03 文章 Jark Wu
检查下提交作业的 flink 版本,和 yarn 集群上部署的 flink 版本是否一致。
或者可能是你集群中有两个不同版本的 flink-shaded-jackson 包。

On Wed, 2 Dec 2020 at 11:55, Zed  wrote:

> When I submitted a flink-table-sql job to yarn, the following exception
> came
> out. Wondering how to solve it. Anyone can help me with that? Appreciate
> it
>
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
> at
>
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:275)
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: cannot assign instance of
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.LRUMap
> to field
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DeserializerCache._cachedDeserializers
> of type java.util.concurrent.ConcurrentHashMap in instance of
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DeserializerCache
> at
>
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287)
> at
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1417)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> at
>
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> at
>
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
> at
>
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
> at
>
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
> at
>
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260)
> 

Re: 关于flink cdc sql转出Stream流问题

2020-12-03 文章 Jark Wu
row 里面的数据就是你 schema 中定义的字段和顺序,可以按下标取值。

On Tue, 1 Dec 2020 at 13:59, jsqf  wrote:

> 可以使用这种方式:
> DataStream dstream = tableEnv.toAppendStream(sourceTable,
> RowData.class);
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink sql client 报错java.net.NoRouteToHostException: 没有到主机的路由

2020-12-03 文章 Jark Wu
你本地 ping 一下 localhost 看看能不能 ping 通。
另外看看本地有没有开网络代理,有的话关掉试试。

Best,
Jark

On Tue, 1 Dec 2020 at 09:38, 奚焘 <759928...@qq.com> wrote:

> 本人刚学习flink ,下载解压了flink,启动./sql-client.sh embedded ,输入SELECT 'Hello
> World';报错
> Flink SQL> SELECT 'Hello World';
> [ERROR] Could not execute SQL statement. Reason:
> java.net.NoRouteToHostException: 没有到主机的路由
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 摄像头视频流采集

2020-12-03 文章 Jark Wu
适合啊。

On Tue, 1 Dec 2020 at 09:37, Xia(Nate) Qu  wrote:

> 请教各位:
>
>
> 我们想做多个监控摄像头的视频流采集平台,摄像头的数量大概有1000-5000个,摄像头的流数据直接发到采集平台,之后平台可以将数据写到Hadoop或者用于机器学习消费,不知道flink是不是适合这样的场景呢?谢谢
>
>
> 屈夏
>


Re: flink cdc 如何保证group agg结果正确性

2020-12-03 文章 Jark Wu
你的数据源中是不是没有历史全量数据,所以发现结果对不上?

一般建议同步全量+增量数据到 kafka 中,然后flink 从头消费这个 topic。
另外 mysql-cdc connector [1] 也提供了全量+增量读取的能力。

Best,
Jark

[1]:
https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector


On Mon, 30 Nov 2020 at 22:54, kandy.wang  wrote:

> insert into kudu.default_database.index_agg
> SELECT v_spu_id as spu_id,sum(leaving_num*vipshop_price) as
> leaving_price,DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss')
> FROM  XX.XX.XX
> group by v_spu_id;
>
>
> XX.XX.XX 是通过自定义cdc
> format消费公司的cdc数据源,cdc数据源在kafka,数据只保留7天数据,都是增量消费,如何保证结果准确。
> 怎么做初始化,这个初始化,是把数据初始化到state里么? 现在通过对数发现,数据量对不上。


Re: Flink SQL 是否存在类似MySQL的group_concat函数

2020-11-29 文章 Jark Wu
I think you are looking for LISTAGG [1] which is more SQL standard
compliant.

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/systemFunctions.html

On Mon, 30 Nov 2020 at 11:41, zhengzhongni  wrote:

> 各位社区大佬,
> 您好!
>
>
> 不知Flink SQL中是否存在类似MySQL的group_concat函数的功能:
> 例如:
> 数据:
> +--+---+
> | Id   | Name  |
> +--+---+
> |   10 | Larry |
> |   11 | Mike  |
> |   12 | John  |
> |   10 | Elon  |
> |   10 | Bob   |
> |   11 | Sam   |
> +--+———+
> 执行SQL :select Id,group_concat(Name SEPARATOR ',') as resultName from test
> group by Id;
> 期望得到的结果:
> +--+-+
> | Id   | resultName  |
> +--+-+
> |   10 | Larry,Elon,Bob  |
> |   11 | Mike,Sam|
> |   12 | John|
> +--++
>
>
> 这种在统计最近一段时间用户的登录IP,登录的设备等场景会用到,不知道目前的Flink SQL是否有类似的函数?
>
>
> 期待您的回复,非常感谢~~


Re: flink sql cdc 写数据到mysql,找不到相关的类

2020-11-27 文章 Jark Wu
估计是你的 flink-json 和框架已经打包进去的 flink-json 冲突了,可能是你加进去的 flink-json 版本不是 1.11.x ?

On Fri, 27 Nov 2020 at 19:03, cljb...@163.com  wrote:

> 相关的依赖以及添加,不知道如下问题是如何导致,求解!
> 已添加的依赖有:
> flink-connector-mysql-cdc
> flink-format-changelog-json
> flink-json
>
> 报错信息如下:
>
> java.util.ServiceConfigurationError:
> org.apache.flink.table.factories.Factory: Provider
> com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory could not
> be instantiated
> at java.util.ServiceLoader.fail(ServiceLoader.java:232)
> at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
> at java.util.Iterator.forEachRemaining(Iterator.java:116)
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:342)
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:221)
> at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> at com.jfbank.searchrec.main.XfkOsProducts.main(XfkOsProducts.java:31)
> Caused by: java.lang.NoClassDefFoundError:
> org/apache/flink/formats/json/JsonOptions
> at
> com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory.(ChangelogJsonFormatFactory.java:53)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at java.lang.Class.newInstance(Class.java:442)
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
> ... 25 common frames omitted
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.formats.json.JsonOptions
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 32 common frames omitted
>
>
>
> cljb...@163.com
>


Re: Re: canal-json 分库分表场景应用

2020-11-26 文章 Jark Wu
1.12 马上要发布了,canal-json 中没来得及实现。

On Fri, 27 Nov 2020 at 10:49, air23  wrote:

> 你好 请问这个debezium-json 这个value.source.table 功能 在1.12的canal-json会实现吗,
> 看到canal-json代码里面 好像是有这部分代码。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-11-27 10:13:37,"Jark Wu"  写道:
> >目前还不支持,读取 table 元信息。
> >在 1.12 中,debezium-json 支持了这种功能,文档[1], 代码[2]。
> >canal-json的话需要按照类似的方式支持下元信息读取。
> >
> >Best,
> >Jark
> >
> >[1]:
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#available-metadata
> >[2]:
> >
> https://github.com/apache/flink/blob/0a7c23cac26af49bce7c1f79fbf993c0a7f87835/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java#L136
> >
> >On Fri, 27 Nov 2020 at 09:49, air23  wrote:
> >
> >> 你好 我这边有很多这种场景,把分库分表的binlog 写入到同一个topic
> 然后用canal-json解析,这边想获取到json里面的table
> >> 字段,
> >>
> >> 然后根据 table名称加主键 写入到下游 合成一张表,写入到下游表,
> >>
> >> 然后发现 canal-json 是获取不到表名的,然后这边去修改canal-json的format。
> >>
> >> 添加 createJsonRowType方法的DataTypes.FIELD("table", DataTypes.STRING())
> >>
> >> 然后在deserialize方法里面把 table字段set到 data里面去。但是发现这种好像是不成功的 ,请问下
> 有什么具体点的思路提供下吗,
> >>
> >> 谢谢大佬们
>


Re: canal-json 分库分表场景应用

2020-11-26 文章 Jark Wu
目前还不支持,读取 table 元信息。
在 1.12 中,debezium-json 支持了这种功能,文档[1], 代码[2]。
canal-json的话需要按照类似的方式支持下元信息读取。

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#available-metadata
[2]:
https://github.com/apache/flink/blob/0a7c23cac26af49bce7c1f79fbf993c0a7f87835/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java#L136

On Fri, 27 Nov 2020 at 09:49, air23  wrote:

> 你好 我这边有很多这种场景,把分库分表的binlog 写入到同一个topic 然后用canal-json解析,这边想获取到json里面的table
> 字段,
>
> 然后根据 table名称加主键 写入到下游 合成一张表,写入到下游表,
>
> 然后发现 canal-json 是获取不到表名的,然后这边去修改canal-json的format。
>
> 添加 createJsonRowType方法的DataTypes.FIELD("table", DataTypes.STRING())
>
> 然后在deserialize方法里面把 table字段set到 data里面去。但是发现这种好像是不成功的 ,请问下 有什么具体点的思路提供下吗,
>
> 谢谢大佬们


Re: FlinkSQL导致Prometheus内存暴涨

2020-11-26 文章 Jark Wu
IIRC, runtime will truncate the operator name to max 80 characters, see
`TaskMetricGroup#METRICS_OPERATOR_NAME_MAX_LENGTH`.
You can search the log if there are "The operator name {} exceeded the {}
characters length limit and was truncated.".

On Thu, 26 Nov 2020 at 18:18, hailongwang <18868816...@163.com> wrote:

>
>
>
> Hi,
>  是的,个人觉得可以提供一个配置项来控制 task Name。
>  完整的 task name 有助于排查问题等,简短的 task name 有助于在生产环境中 metric
> 的采集,可以极大较少发送的网络开销,存储空间等。
>  已建立个了 issue :https://issues.apache.org/jira/browse/FLINK-20375
>
>
> Best,
> Hailong
>
> 在 2020-11-24 14:19:40,"Luna Wong"  写道:
> >FlinkSQL 生成的Metrics数据 task_name名字超长,导致Prometheus查询的时候内存暴涨,生产环境接受不了。
> >下面只是一个最简单的例子,复杂的SQL生成的task_name长达9000字节。这会导致Prometheus内存暴涨,我该怎么办。
> >
>
> >task_name="Source:_wuren_foo_ods_fooSourceConversion_table__Unregistered_DataStream_1___fields__id__name__SinkConversionToRowSourceConversion_table__default_catalog_default_database_ods_foo___fields__id__name__PROCTIME__Calc_select__id__name__SinkConversionToTuple2Sink:_Unnamed"
>


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-26 文章 Jark Wu
Btw, I created an issue to track this problem:
https://issues.apache.org/jira/browse/FLINK-20374
Hope we can fix it in the next versions to have a better out-of-box
experience.

Best,
Jark

On Thu, 19 Nov 2020 at 13:58, Jark Wu  wrote:

> 如果数据本身没什么倾斜,且并发也能打上去。那在 sql 这边也没什么其他办法了。得从 rocksdb 的角度去调优看看。比如:
> 1. 是否有使用 SSD?
> 2. 调整 write buffer 和 block cache
> 3. 更多可以看下这些 state 调优文章[1][2].
>
> Best,
> Jark
>
> [1]: https://mp.weixin.qq.com/s/r0iPPGWceWkT1OeBJjvJGg
> [2]: https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA
>
> On Thu, 19 Nov 2020 at 12:19, jindy_liu <286729...@qq.com> wrote:
>
>> 很感谢jark!
>> 1、昨天将status表设置成时态表(Temporal
>> Tables),然后连续join试了下。确实有你说的问题,status表的更新不会触发任务计算,所有的数据实时变更需要test流来驱动。
>>
>> 同时时态表TTL设置问题,太小i/o有问题,太大结果不及时,与应用场景要求不符合,主要我们的场景下,status表也并不是维表,并且也数据量也大,变化也多。
>>
>> 2、并发度1的话,可以预见的是有主要性能问题,表大的情况下,join导致的反压厉害。
>>
>> 3、因为多并发度(10,20,40,80)测试中,我将join的两个表(test,
>> status)的数据完全对称,没有倾斜问题(200w,200w,并且join的key对称,test 1 -- status1, test 2 --
>> status2, test 200 -- status200),source仍然存在着反压,只是并发度高的反压慢点出现一些,
>> 这里的flink state后端用的是rokcsdb+本地文件。磁盘i/o看也就在2w block/s的,难道是默认的rokcsdb配置性能不够?
>>
>> 这个数据反压上,jark你有啥建议吗?
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>
>


Re: Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决

2020-11-25 文章 Jark Wu
这个倒还好,毕竟任务不会非常多,这点压力MySQL还是能抗住的。

文档中的描述不太准确,不配 server id ,不会对 MySQL 链接造成冲击的。 我更新了下文档。
https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector#set-a-differnet-server-id-for-each-job



On Thu, 26 Nov 2020 at 09:53, yujianbo <15205029...@163.com> wrote:

> 感谢Jark的回答,还想请问大佬,想问社区的mysql cdc 的wiki上说具有许多的不同的 server
> id去连接mysql服务器,会造成mysql
> cpu和连接高峰。那想问我们cdc采用sql指定不同的 serverid 去拉不同的表, 是不是这样的cdc 任务也不要太多?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决

2020-11-25 文章 Jark Wu
1. 默认是随机的。所以可能会重复。
2,3.  有问题。server id 是库级别的。

On Wed, 25 Nov 2020 at 17:41, yujianbo <15205029...@163.com> wrote:

> 主要是为了实现解析自定义的schema,sink端好输出到下游。
> 想请教一个问题:
>
> https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector#set-a-differnet-server-id-for-each-job
> 看了上面这个链接关于为每个作业设置一个differnet server id的问题。我看sql可以指定不同的server id,所以有下面这三个疑惑:
> 1、 如果是不同的stream 任务 的它的server id是不是同一个?
> 2、不同的stream 任务 同步同一个数据库的不同表是不是没有问题
> 3、不同的stream 任务 同步同一个数据库的同一张表是不是有问题
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql时间戳字段类型转换问题

2020-11-24 文章 Jark Wu
你可以用这篇文章中的 docker:
https://flink.apache.org/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html
https://raw.githubusercontent.com/wuchong/flink-sql-demo/v1.11-EN/docker-compose.yml

这个容器里面的 ts 数据格式是 SQL 格式的。

1. 像上述时间格式字段在Flink SQL中应该解析成什么类型?
TIMESTAMP WITH LOCAL TIME ZONE, 1.12 的 json formart 才支持。

2. 是的

3. Flink 目前还不支持 TIMESTAMP WITH TIME ZONE。
'-MM-dd HH:mm:ss' 这种,对应的是 TIMESTAMP,代表无时区 timestamp
long 值,或者  '-MM-dd HH:mm:ssZ' 这种是TIMESTAMP WITH LOCAL TIME ZONE
,代表session 时区的 timestamp

Best,
Jark



On Wed, 25 Nov 2020 at 12:03, 陈帅  wrote:

> 数据源来自Jark项目 https://github.com/wuchong/flink-sql-submit
>  中的kafka消息,里面user_behavior消息例如
> {"user_id": "470572", "item_id":"3760258", "category_id": "1299190",
> "behavior": "pv", "ts": "2017-11-26T01:00:01Z"}
> 可以看到ts值是  '2017-11-26T01:00:00Z',现在要为它定义一张flink sql源表,如下
>
> CREATE TABLE user_log (
> user_id VARCHAR,
> item_id VARCHAR,
> category_id VARCHAR,
> behavior VARCHAR,
> ts TIMESTAMP(3),
> WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'user_behavior',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'properties.group.id' = 'testGroup',
> 'format' = 'json',
> -- 'json.timestamp-format.standard' = 'ISO-8601', // 不加这一行默认是'SQL'
> 'scan.startup.mode' = 'earliest-offset'
> );
>
> 程序运行会抛错
> Caused by: java.time.format.DateTimeParseException: Text
> '2017-11-26T01:00:00Z' could not be parsed at index 10
>
> 我查了一下flink json官方文档
>
> https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/table/connectors/formats/json.html#json-timestamp-format-standard
> 目前只支持两种格式:SQL 和 ISO-8601
> 其中SQL支持的格式是 '-MM-dd HH:mm:ss',
> 而ISO-8601支持的格式是 '-MM-ddTHH:mm:ss.s{precision}'
> 确实不支持上面的 '-MM-ddTHH:mm:ssZ' (注意末尾的Z)
>
> 请问:
> 1. 像上述时间格式字段在Flink SQL中应该解析成什么类型?
> 2. 如果不能直接支持的话是不是得先用VARCHAR类型接收,再利用 UNIX_TIMESTAMP(ts_string,
> pattern_string) 函数转成 支持的时间格式?可问题是 pattern_string
> 里面如果包含单引号要如何转义?UNIX_TIMESTAMP('2017-11-26T01:00:00Z',
> '-MM-dd'T'HH:mm:ss'Z'')?
> 3. TIMESTAMP WITH TIME ZONE和TIMESTAMP WITH LOCAL TIME
> ZONE这两种类型在什么情况下会用到?有例子吗?
>
> 谢谢!
>


Re: Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决

2020-11-24 文章 Jark Wu
Btw,能问一下为什么用 Stream API 而不是直接用 Flink SQL 么?

On Wed, 25 Nov 2020 at 00:21, Jark Wu  wrote:

> See the docs:
> https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector#setting-up-mysql-session-timeouts
>
> On Tue, 24 Nov 2020 at 23:54, yujianbo <15205029...@163.com> wrote:
>
>> 一、环境:
>> 1、版本:1.11.2
>> 2、flink CDC 用Stream  API 从mysql  同步到kudu
>>
>> 二、遇到的问题现象:
>> 1、目前线上已经同步了几张mysql表到kudu了,mysql的量级都在3千万左右。
>>  但是有一张mysql表同步了几次都遇到一个问题:大概能判断在全量阶段,还没到增量阶段。
>>
>>
>> 错误日志在下面。目前想采取“autoReconnect=true”看看来避免,到是不应该加在哪个地方,看日志感觉加了这个参数也是治标不治本,重点是为啥不发送packet,造成了卡顿?
>>
>>  下面是具体报错:
>> ==
>> 2020-11-24 20:00:37,547 *ERROR io.debezium.connector.mysql.SnapshotReader
>> *
>> [] - Failed due to error: Aborting snapshot due to error when last running
>> 'SELECT * FROM `uchome`.`forums_post_12`': *The last packet successfully
>> received from the server was 39 milliseconds ago.  The last packet sent
>> successfully to the server was 6,772,615 milliseconds ago. is longer than
>> the server configured value of 'wait_timeout'. You should consider either
>> expiring and/or testing connection validity before use in your
>> application,
>> increasing the server configured values for client timeouts, or using the
>> Connector/J connection property 'autoReconnect=true' to avoid this
>> problem.*
>> org.apache.kafka.connect.errors.ConnectException: The last packet
>> successfully received from the server was 39 milliseconds ago.  The last
>> packet sent successfully to the server was 6,772,615 milliseconds ago. is
>> longer than the server configured value of 'wait_timeout'. You should
>> consider either expiring and/or testing connection validity before use in
>> your application, increasing the server configured values for client
>> timeouts, or using the Connector/J connection property
>> 'autoReconnect=true'
>> to avoid this problem. Error code: 0; SQLSTATE: 08S01.
>> at
>> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
>>
>> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
>> at
>> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
>>
>> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
>> at
>>
>> io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831)
>>
>> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> [?:1.8.0_231]
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> [?:1.8.0_231]
>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
>> *Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException: The last
>> packet successfully received from the server was 39 milliseconds ago.  The
>> last packet sent successfully to the server was 6,772,615 milliseconds
>> ago.
>> is longer than the server configured value of 'wait_timeout'. *You should
>> consider either expiring and/or testing connection validity before use in
>> your application, increasing the server configured values for client
>> timeouts, or using the Connector/J connection property
>> 'autoReconnect=true'
>> to avoid this problem.
>> at
>>
>> com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174)
>>
>> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
>> at
>>
>> com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64)
>>
>> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
>> ===
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>


Re: Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决

2020-11-24 文章 Jark Wu
See the docs:
https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector#setting-up-mysql-session-timeouts

On Tue, 24 Nov 2020 at 23:54, yujianbo <15205029...@163.com> wrote:

> 一、环境:
> 1、版本:1.11.2
> 2、flink CDC 用Stream  API 从mysql  同步到kudu
>
> 二、遇到的问题现象:
> 1、目前线上已经同步了几张mysql表到kudu了,mysql的量级都在3千万左右。
>  但是有一张mysql表同步了几次都遇到一个问题:大概能判断在全量阶段,还没到增量阶段。
>
>
> 错误日志在下面。目前想采取“autoReconnect=true”看看来避免,到是不应该加在哪个地方,看日志感觉加了这个参数也是治标不治本,重点是为啥不发送packet,造成了卡顿?
>
>  下面是具体报错:
> ==
> 2020-11-24 20:00:37,547 *ERROR io.debezium.connector.mysql.SnapshotReader
> *
> [] - Failed due to error: Aborting snapshot due to error when last running
> 'SELECT * FROM `uchome`.`forums_post_12`': *The last packet successfully
> received from the server was 39 milliseconds ago.  The last packet sent
> successfully to the server was 6,772,615 milliseconds ago. is longer than
> the server configured value of 'wait_timeout'. You should consider either
> expiring and/or testing connection validity before use in your application,
> increasing the server configured values for client timeouts, or using the
> Connector/J connection property 'autoReconnect=true' to avoid this
> problem.*
> org.apache.kafka.connect.errors.ConnectException: The last packet
> successfully received from the server was 39 milliseconds ago.  The last
> packet sent successfully to the server was 6,772,615 milliseconds ago. is
> longer than the server configured value of 'wait_timeout'. You should
> consider either expiring and/or testing connection validity before use in
> your application, increasing the server configured values for client
> timeouts, or using the Connector/J connection property 'autoReconnect=true'
> to avoid this problem. Error code: 0; SQLSTATE: 08S01.
> at
> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
>
> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
> at
> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
>
> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
> at
> io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831)
>
> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_231]
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_231]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
> *Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException: The last
> packet successfully received from the server was 39 milliseconds ago.  The
> last packet sent successfully to the server was 6,772,615 milliseconds ago.
> is longer than the server configured value of 'wait_timeout'. *You should
> consider either expiring and/or testing connection validity before use in
> your application, increasing the server configured values for client
> timeouts, or using the Connector/J connection property 'autoReconnect=true'
> to avoid this problem.
> at
>
> com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174)
>
> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
> at
>
> com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64)
>
> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
> ===
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: SQL Cli中找不到DeserializationSchemaFactory

2020-11-24 文章 Jark Wu
YAML file 中定义的 source sink 是通过老的 factory 来寻找的,debezium format
只实现了新接口,所以会找不到。
目前也没有计划在 YAML 中支持新接口,因为 YAML 的方式已经被废弃了。
可以看下这个issue: https://issues.apache.org/jira/browse/FLINK-20260

Best,
Jark

On Tue, 24 Nov 2020 at 18:52, jy l  wrote:

> Hi:
> flink版本1.12.0:
>
> 我想在sql-client-defaults.yaml中配置一张表,配置如下:
>
> tables:
>
>   - name: t_users
>
> type: source-table
>
> connector:
>
> property-version: 1
>
> type: kafka
>
> version: universal
>
> topic: ods.userAnalysis.user_profile
>
> startup-mode: latest-offset
>
> properties:
>
> bootstrap.servers: hostname:9092
>
> group.id: flink-analysis
>
> format:
>
> type: debezium-avro-confluent
>
> property-version: 1
>
> debezium-avro-confluent.schema-registry.url: http://hostname:8081
>
> #schema-registry.url: http://hostname:8081
>
> schema:
>
> - name: userId
>
>   data-type: STRING
>
> - name: province
>
>   data-type: STRING
>
> - name: city
>
>   data-type: STRING
>
> - name: age
>
>   data-type: INT
>
> - name: education
>
>   data-type: STRING
>
> - name: jobType
>
>   data-type: STRING
>
> - name: marriage
>
>   data-type: STRING
>
> - name: sex
>
>   data-type: STRING
>
> - name: interest
>
>   data-type: STRING
>
>
>
>
> 我把相关的包都已经放到了lib目录下,启动sql cli时报错如下:
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Unexpected exception.
> This is a bug. Please consider filing an issue.
>
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208)
>
> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> Could not create execution context.
>
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878)
>
> at
>
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226)
>
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
>
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196)
>
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
>
> the classpath.
>
>
> Reason: Required context properties mismatch.
>
>
> The following properties are requested:
>
> connector.properties.bootstrap.servers=henghe66:9092
>
> connector.properties.group.id=flink-analysis
>
> connector.property-version=1
>
> connector.startup-mode=latest-offset
>
> connector.topic=ods.userAnalysis.user_profile
>
> connector.type=kafka
>
> connector.version=universal
>
> format.debezium-avro-confluent.schema-registry.url=
> http://192.168.101.43:8081
>
> format.property-version=1
>
> format.type=debezium-avro-confluent
>
> schema.0.data-type=VARCHAR(2147483647)
>
> schema.0.name=userId
>
> schema.1.data-type=VARCHAR(2147483647)
>
> schema.1.name=province
>
> schema.2.data-type=VARCHAR(2147483647)
>
> schema.2.name=city
>
> schema.3.data-type=INT
>
> schema.3.name=age
>
> schema.4.data-type=VARCHAR(2147483647)
>
> schema.4.name=education
>
> schema.5.data-type=VARCHAR(2147483647)
>
> schema.5.name=jobType
>
> schema.6.data-type=VARCHAR(2147483647)
>
> schema.6.name=marriage
>
> schema.7.data-type=VARCHAR(2147483647)
>
> schema.7.name=sex
>
> schema.8.data-type=VARCHAR(2147483647)
>
> schema.8.name=interest
>
>
> The following factories have been considered:
>
> org.apache.flink.formats.avro.AvroRowFormatFactory
>
> org.apache.flink.formats.csv.CsvRowFormatFactory
>
> org.apache.flink.formats.json.JsonRowFormatFactory
>
> at
>
> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
>
> at
>
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
>
> at
>
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>
> at
>
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
>
> at
>
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:289)
>
> at
>
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:171)
>
> at
>
> org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:61)
>
> at
>
> org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:63)
>
> at
>
> org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:74)
>
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:391)
>
> at
>
> 

Re: Flink SQL的灵异事件----查询语句中增加表中的某个字段时就没法正常查询了。

2020-11-23 文章 Jark Wu
看报错像是一个 bug。 请问使用的是哪个版本呢?
可以去 JIRA issue 提个 issue。

Best,
Jark

On Tue, 24 Nov 2020 at 11:27, jy l  wrote:

> Hi:
> FlinkSQL我在使用时发生一件很诡异的事件。具体如下:
>
> 我的DDL:
> create table if not exists t_order(
> id int PRIMARY KEY comment '订单id',
> timestamps bigint comment '订单创建时间',
> orderInformationId string comment '订单信息ID',
> userId string comment '用户ID',
> categoryId int comment '商品类别',
> productId int comment '商品ID',
> price decimal(10,2) comment '单价',
> productCount int comment '购买数量',
> priceSum decimal(10,2) comment '订单总价',
> shipAddress string comment '商家地址',
> receiverAddress string comment '收货地址',
> ts AS TO_TIMESTAMP(FROM_UNIXTIME(timestamps/1000)),
> WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
> )with(
> 'connector' = 'kafka',
> 'format' = 'debezium-avro-confluent',
> 'debezium-avro-confluent.schema-registry.url' = 'http://手动打码ip:8081
> 
> ',
> 'topic' = 'ods.userAnalysis.order',
> 'properties.bootstrap.servers' = '手动打码ip:9092',
> 'properties.group.id' = 'flink-analysis',
> 'scan.startup.mode' = 'latest-offset'
> )
>
> 我在查询该表时,使用如下查询语句能够正常查询出来:
>
>- select * from t_order
>- select receiverAddress from t_order
>- select
>id,
>timestamps,
>orderInformationId,
>userId,
>categoryId,
>productId,
>price,
>productCount,
>priceSum,
>shipAddress
>from t_order
>
> 但是我在第三条语句中加上receiverAddress字段时,就查询不出来了,sql如下:
> select
> id,
> timestamps,
> orderInformationId,
> userId,
> categoryId,
> productId,
> price,
> productCount,
> priceSum,
> shipAddress,
> receiverAddress
> from t_order,
> 报错信息如下:
> Exception in thread "main" org.apache.flink.table.api.TableException: This
> calc has no useful projection and no filter. It should be removed by
> CalcRemoveRule.
> at
>
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:166)
> at
>
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:84)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:82)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
> ...
>
> receiverAddress这个字段明明是在我的DDL中具体申明了的,且单独查询也能出来。
> 这具体是什么原因呢?望各位大佬告知。
>
>
> 祝好!
>


Re: 关于Catalog的建议

2020-11-23 文章 Jark Wu
1. 可以的
2. 是的。见文档
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/use.html#use-catloag
3. 是的。

Hive metastore catalog 就是 Flink 官方提供的通用 catalog(可以存任何 connector 类型)。

Best,
Jark


On Tue, 24 Nov 2020 at 10:58, admin <17626017...@163.com> wrote:

> Hi Rui Li,
> > FlinkSQL允许一个Session使用多个Catalog,所以Catalog的选择不是互斥的,可以混用。
>
> 一个job里面可以切换catalog的是吧,比如从读kafka中 写hive 的 db1.hive_table。
> 几个问题请教一下:
> 1.create kafka source 使用  memory catalog,hive table 使用hive catalog,这样是可以的吧
> 2.在sql里面切换catalog的语法是什么,在[1]里面没看到,是这样吗 USE CATALOG
> catalogName(default_catalog/hive_catalog)
>
> 3.在注册hivecatalog时,需要指定一个默认的database,比如指定了默认test,然后要写到db1的hive_table,是不是切换一下database即可。
> USE db1;
> 感谢
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/catalogs.html
>
> > 2020年11月23日 下午8:52,Rui Li  写道:
> >
> > Hi,
> >
> > FlinkSQL允许一个Session使用多个Catalog,所以Catalog的选择不是互斥的,可以混用。
> >
> > 关于你的两个问题:
> > 1. 我理解JDBC Catalog主要是为了方便用户查询JDBC的表,目前的实现应该基本是个只读的Catalog
> > [1],文档也许是可以说的更明确一些。
> > 2.
> >
> 我觉得要实现一个完整的、生产可用的元数据管理系统都不会太“简单”,能读写schema只是最基础的要求,是否支持并发访问、如何支持HA、如何保证元数据安全都是需要考虑的问题。而hive
> > metastore已经有比较多的人在用了,所以借助它来持久化元数据是个性价比比较高的选择。
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#postgres-database-as-a-catalog
> >
> > On Mon, Nov 23, 2020 at 7:41 PM 赵一旦  wrote:
> >
> >> 目前Flink提供memory、jdbc、hive这3种catalog。
> >> 感觉实际使用中,可以使用如下几种方案。
> >>
> >> (1)选择memory catalog,然后每次sql都带上自己的相关DDL。
> >> (2)选择某种catalog,支持“持久化”DDL定义,然后具体sql就不需要带上自己相关的DDL了。
> >>
> >> 方案1和方案2各有优缺点。
> >> 方案1的优点:
> >>比如sql1和sql2都只针kafka topic的部分时间段范围,这种情况某个kafka
> >>
> >>
> topic就不方便写死DDL(持久化),而应该每个SQL自带一个定义。(当然,使用方案2也是可以基于options的覆盖方式简化sql1和sql2自带DDL定义的语句的)
> >> 方案1的缺点:
> >>很明显,不支持“持久化”本身就是缺点,这也是方案2的优点。
> >>
> >> -然后,我的问题来了。
> >>
> >>
> 在Flink文档中,HiveCatalog写了其作用是作为flink表元数据,同时也是作为读取hive表元数据的接口。而在JdbcCatalog中没写其支持的表类型(Connect类型)。
> >> 问题1(如上)没针对每个catalog写清楚其支持的connector类型,即表类型。
> >>
> >>
> >>
> 问题2:能否提供一个更简单方便的支持持久化,且支持所有connector类型的catalog的实现。“简单”指的是比如通过Mysql/PostgreSQL什么的,再或者直接json文件作为存储都可以。“持久化”即可以持久化。
> >>
> >>
> >>
> 当然,考虑到hive这种元数据使用其他存储可能需要额外复杂的转化,我感觉至少应该搞个相对通用的catalog,比如支持(mysql表,kafka表(kafka元数据很简单,用mysql啥的肯定能存储吧),...)。
> >>
> >
> >
> > --
> > Best regards!
> > Rui Li
>
>


Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-23 文章 Jark Wu
你用的还是老的 connector 吧?也就是 'connector.type' = 'jdbc'。这个是根据 query 是否有更新来决定工作模式的。
如果你用新的 connector,也就是 'connector' = 'jdbc',这个的行为就是你说的第 (2) 种行为。

Best,
Jark

On Mon, 23 Nov 2020 at 17:14, 赵一旦  wrote:

> duplicate情况可能update pv =values(pv), 也可能 update pv = pv +
> values(pv),这个根据具体任务统计逻辑决定,所以直接交给用户去设置挺好的。
>
> 此外,当前jdbc connector中,对于source情况也支持自定义sql,我指参数connector.read.query'
> 。所以其实可以类似来个参数connector.write.sql',sql中通过?占位符指定字段啥的。
>
>
> 赵一旦  于2020年11月23日周一 下午5:09写道:
>
> > 总结下:
> > (1)group
> >
> by查询,输出为update流,配合mysqlSink可实现更新。(DDL中不需要定义update-mode,1.11中本身也不支持update-mode参数了)。
> > (2)group by+tumble window查询,输出为append流,配合mysqlSink不可实现更新。
> >
> >
> >
> 如上,如果是case(2)想要实现更新方式输出到mysql怎么办呢?感觉Flink不应该基于流的特征决定怎么输出,毕竟涉及到外部存储,实际情况和具体业务逻辑有关,仅根据流的特征决定怎么输出不合适。
> > 还不如基于MySQL表的DDL(update-mode参数),或者隐式根据是否定义了primary key决定是否update方式输出。
> >
> >
> > 说个假想的示例,如果我指定kafka从过去某个实际重新跑某个任务,这种情况部分输出肯定重复,用户一般都会希望duplicate
> update方式输出。
> > 甚至DDL中推荐可以搞个自定义on
> > duplicate的处理逻辑,因为有些大窗口(天级别)的统计,有时候还可能基于小窗口叠加产生,也是需要依靠mysql提供的on duplicate
> > update功能。
> >
> >
> >
> >
> > 赵一旦  于2020年11月23日周一 下午4:48写道:
> >
> >> 嗯。刚刚我做了个测试,简化,从kafka读入直接写mysql。
> >> 发现这种方式也不行,但是加了group by之后是可以的。
> >>
> >> (1)
> >> 所以说是否还需要query带有key的语义才行呢?
> >> 比如group by的结果是可能update的,并且基于group by key也指出了key。
> >>
> >> 那么group by + tumble window情况下,输出貌似是append的,这种情况是否可以使用upsert方式输出到mysql呢?
> >>
> >> (2)如JarkWu所说,是mysql表的DDL部分决定。
> >>
> >> 如上1和2,哪个对呢?当然我个人期望是2。但是如果是2的话,那么为什么从kafka读取直接写mysql情况不可以呢?
> >>
> >> Jark Wu  于2020年11月23日周一 下午4:28写道:
> >>
> >>> 这个页面就是我上面说的 旧版connector,已经被废弃了,所以侧边栏没有导航。
> >>>
> >>> 新版的 jdbc sink,会根据 ddl 上是否有 PK 来决定是否工作在 upsert 模式。
> >>>
> >>> Best,
> >>> Jark
> >>>
> >>> On Mon, 23 Nov 2020 at 15:39, 赵一旦  wrote:
> >>>
> >>> > 看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到
> >>> >
> >>> >
> >>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes
> >>> > 页面。
> >>> >
> >>> >
> >>> >
> >>>
> 1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。
> >>> >
> >>> > Jark Wu  于2020年11月23日周一 下午3:32写道:
> >>> >
> >>> > > 请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。
> >>> > >
> >>> > > On Mon, 23 Nov 2020 at 15:21, 赵一旦  wrote:
> >>> > >
> >>> > > > 如下是Flink官方文档JBDC connector的部分内容。Key handling
> >>> > > > <
> >>> > > >
> >>> > >
> >>> >
> >>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> >>> > > > >
> >>> > > >
> >>> > > > Flink uses the primary key that defined in DDL when writing data
> to
> >>> > > > external databases. The connector operate in upsert mode if the
> >>> primary
> >>> > > key
> >>> > > > was defined, otherwise, the connector operate in append mode.
> >>> > > >
> >>> > > > In upsert mode, Flink will insert a new row or update the
> existing
> >>> row
> >>> > > > according to the primary key, Flink can ensure the idempotence in
> >>> this
> >>> > > way.
> >>> > > > To guarantee the output result is as expected, it’s recommended
> to
> >>> > define
> >>> > > > primary key for the table and make sure the primary key is one of
> >>> the
> >>> > > > unique key sets or primary key of the underlying database table.
> In
> >>> > > append
> >>> > > > mode, Flink will interpret all records as INSERT messages, the
> >>> INSERT
> >>> > > > operation may fail if a primary key or unique constraint
> violation
> >>> > > happens
> >>> > > > in the underlying database.
> >>> > > >
> >>> > > > See CREATE TABLE DDL
> >>> > > > <
> >>> > > >
> >>> > >
> >>> >
> >>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
> >>> > > > >
> 

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-23 文章 Jark Wu
这个页面就是我上面说的 旧版connector,已经被废弃了,所以侧边栏没有导航。

新版的 jdbc sink,会根据 ddl 上是否有 PK 来决定是否工作在 upsert 模式。

Best,
Jark

On Mon, 23 Nov 2020 at 15:39, 赵一旦  wrote:

> 看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes
> 页面。
>
>
> 1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。
>
> Jark Wu  于2020年11月23日周一 下午3:32写道:
>
> > 请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。
> >
> > On Mon, 23 Nov 2020 at 15:21, 赵一旦  wrote:
> >
> > > 如下是Flink官方文档JBDC connector的部分内容。Key handling
> > > <
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> > > >
> > >
> > > Flink uses the primary key that defined in DDL when writing data to
> > > external databases. The connector operate in upsert mode if the primary
> > key
> > > was defined, otherwise, the connector operate in append mode.
> > >
> > > In upsert mode, Flink will insert a new row or update the existing row
> > > according to the primary key, Flink can ensure the idempotence in this
> > way.
> > > To guarantee the output result is as expected, it’s recommended to
> define
> > > primary key for the table and make sure the primary key is one of the
> > > unique key sets or primary key of the underlying database table. In
> > append
> > > mode, Flink will interpret all records as INSERT messages, the INSERT
> > > operation may fail if a primary key or unique constraint violation
> > happens
> > > in the underlying database.
> > >
> > > See CREATE TABLE DDL
> > > <
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
> > > >
> > > for
> > > more details about PRIMARY KEY syntax.
> > >
> > >
> > > 这里也有一点,In append mode, Flink will interpret all records as INSERT
> > messages,
> > > the INSERT operation may fail if a primary key or unique constraint
> > > violation happens in the underlying database.  什么叫append
> > > mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
> > >
> > > 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
> > >
> > >
> > >
> > > 赵一旦  于2020年11月23日周一 下午3:02写道:
> > >
> > > > 补充sql:
> > > >
> > > > DDL:
> > > >
> > > > CREATE TABLE flink_recent_pv_subid
> > > > (
> > > > `supply_id` STRING,
> > > > `subid` STRING,
> > > > `mark`  STRING,
> > > > `time`  STRING,
> > > > `pv`BIGINT,
> > > > PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT ENFORCED
> > > > ) WITH (
> > > >   'connector.type' = 'jdbc',
> > > >
> > > >   ..
> > > >
> > > > );
> > > >
> > > >
> > > > 查询SQL:
> > > >
> > > > INSERT INTO
> > > > flink_recent_pv_subid
> > > > SELECT
> > > > `sid`,
> > > > `subid`,
> > > > `mark`,
> > > > DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
> > > 'MMddHHmm') as `time`,
> > > > count(1) AS `pv`
> > > > FROM baidu_log_view
> > > > GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5'
> > MINUTE);
> > > >
> > > >
> > > > 赵一旦  于2020年11月23日周一 下午3:00写道:
> > > >
> > > >> @hailongwang 一样的。
> > > >>
> > > >> 有个情况说明下,我是tumble
> window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
> > > >> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
> > > >> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> hailongwang <18868816...@163.com> 于2020年11月23日周一 下午2:39写道:
> > > >>
> > > >>> 数据库中主键的设置跟 primary key 定义的一样不?
> > > >>>
> > > >>>
> > > >>> Best,
> > > >>> Hailong
> > > >>> 在 2020-11-23 13:15:01,"赵一旦"  写道:
> > > >>> >如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert
> on
> > > >>> >duplicate方式写入。
> > > >>> >
> > > &

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

2020-11-22 文章 Jark Wu
请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。

On Mon, 23 Nov 2020 at 15:21, 赵一旦  wrote:

> 如下是Flink官方文档JBDC connector的部分内容。Key handling
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> >
>
> Flink uses the primary key that defined in DDL when writing data to
> external databases. The connector operate in upsert mode if the primary key
> was defined, otherwise, the connector operate in append mode.
>
> In upsert mode, Flink will insert a new row or update the existing row
> according to the primary key, Flink can ensure the idempotence in this way.
> To guarantee the output result is as expected, it’s recommended to define
> primary key for the table and make sure the primary key is one of the
> unique key sets or primary key of the underlying database table. In append
> mode, Flink will interpret all records as INSERT messages, the INSERT
> operation may fail if a primary key or unique constraint violation happens
> in the underlying database.
>
> See CREATE TABLE DDL
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
> >
> for
> more details about PRIMARY KEY syntax.
>
>
> 这里也有一点,In append mode, Flink will interpret all records as INSERT messages,
> the INSERT operation may fail if a primary key or unique constraint
> violation happens in the underlying database.  什么叫append
> mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
>
> 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
>
>
>
> 赵一旦  于2020年11月23日周一 下午3:02写道:
>
> > 补充sql:
> >
> > DDL:
> >
> > CREATE TABLE flink_recent_pv_subid
> > (
> > `supply_id` STRING,
> > `subid` STRING,
> > `mark`  STRING,
> > `time`  STRING,
> > `pv`BIGINT,
> > PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT ENFORCED
> > ) WITH (
> >   'connector.type' = 'jdbc',
> >
> >   ..
> >
> > );
> >
> >
> > 查询SQL:
> >
> > INSERT INTO
> > flink_recent_pv_subid
> > SELECT
> > `sid`,
> > `subid`,
> > `mark`,
> > DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
> 'MMddHHmm') as `time`,
> > count(1) AS `pv`
> > FROM baidu_log_view
> > GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5' MINUTE);
> >
> >
> > 赵一旦  于2020年11月23日周一 下午3:00写道:
> >
> >> @hailongwang 一样的。
> >>
> >> 有个情况说明下,我是tumble window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
> >> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
> >> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
> >>
> >>
> >>
> >>
> >> hailongwang <18868816...@163.com> 于2020年11月23日周一 下午2:39写道:
> >>
> >>> 数据库中主键的设置跟 primary key 定义的一样不?
> >>>
> >>>
> >>> Best,
> >>> Hailong
> >>> 在 2020-11-23 13:15:01,"赵一旦"  写道:
> >>> >如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert on
> >>> >duplicate方式写入。
> >>> >
> >>> >但我在使用中,发现报了 duplicate entry的错误。例如:
> >>> >Caused by: com.mysql.jdbc.exceptions.jdbc4.
> >>> >MySQLIntegrityConstraintViolationException: Duplicate entry
> >>> >'2036-feed_landing_box_news-2000-202011231405' for key 'uniq_ssmt'
> >>> >at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> >>> Method)
> >>> >at sun.reflect.NativeConstructorAccessorImpl.newInstance(
> >>> >NativeConstructorAccessorImpl.java:62)
> >>> >at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> >>> >DelegatingConstructorAccessorImpl.java:45)
> >>> >at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >>> >at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
> >>> >at com.mysql.jdbc.Util.getInstance(Util.java:386)
> >>> >at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
> >>> >at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
> >>> >at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
> >>> >at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
> >>> >at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
> >>> >at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
> >>> >at
> >>> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
> >>> >.java:2157)
> >>> >at
> com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> >>> >.java:2460)
> >>> >at
> com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> >>> >.java:2377)
> >>> >at
> com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> >>> >.java:2361)
> >>> >at com.mysql.jdbc.PreparedStatement.executeBatchedInserts(
> >>> >PreparedStatement.java:1793)
> >>> >
> >>> >(2)
> >>> >此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。
> >>> >但这个冲突的entry是在14.11分那一波才报错的。
> >>>
> >>
>


Re: FlinkSQL CDC 窗口分组聚合求助

2020-11-22 文章 Jark Wu
那是不是用非窗口聚合,开5s mini batch,是不是可以达到你的需求?

Best,
Jark

On Mon, 23 Nov 2020 at 13:16, jy l  wrote:

> 使用场景就是我们想用Flink 根据订单系统中的订单表,每5s钟计算一次总的交易金额。
> 目前我们的系统大致架构是mysql(debezium)>kafka--->flink>es
>
> Jark Wu  于2020年11月23日周一 上午10:35写道:
>
> > Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。
> > 你可以使用非 window 聚合来代替。
> >
> > Btw,你可能说一下你的需求场景么? 为什么需要在  CDC 上做 window 操作呢?
> >
> > Best,
> > Jark
> >
> > On Mon, 23 Nov 2020 at 10:28, jy l  wrote:
> >
> > > Hi:
> > > 我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下:
> > > [image: image.png]
> > > [image: image.png]
> > > 分组计算的SQL如下:
> > > [image: image.png]
> > > 在执行计算时,报了如下异常:
> > > Exception in thread "main" org.apache.flink.table.api.TableException:
> > > GroupWindowAggregate doesn't support consuming update and delete
> changes
> > > which is produced by node TableSourceScan(table=[[default_catalog,
> > > default_database, t_order,
> watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1,
> > > 1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps,
> > > orderInformationId, userId, categoryId, productId, price, productCount,
> > > priceSum, shipAddress, receiverAddress])
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> > > at
> > >
> >
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> > > at scala.collection.immutable.Range.foreach(Range.scala:155)
> > > at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> > > at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> > > at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> > > at
> > >
> >
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> > >
> > > 我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。
> > > 那面对我这样的情况,该用什么方案来解决?
> > > 望知道的各位告知一下,感谢!
> > >
> > > 祝好
> > >
> > >
> >
>


Re: FlinkSQL CDC 窗口分组聚合求助

2020-11-22 文章 Jark Wu
我建了个 issue 跟进这个功能:https://issues.apache.org/jira/browse/FLINK-20281

On Mon, 23 Nov 2020 at 10:35, Jark Wu  wrote:

> Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。
> 你可以使用非 window 聚合来代替。
>
> Btw,你可能说一下你的需求场景么? 为什么需要在  CDC 上做 window 操作呢?
>
> Best,
> Jark
>
> On Mon, 23 Nov 2020 at 10:28, jy l  wrote:
>
>> Hi:
>> 我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下:
>> [image: image.png]
>> [image: image.png]
>> 分组计算的SQL如下:
>> [image: image.png]
>> 在执行计算时,报了如下异常:
>> Exception in thread "main" org.apache.flink.table.api.TableException:
>> GroupWindowAggregate doesn't support consuming update and delete changes
>> which is produced by node TableSourceScan(table=[[default_catalog,
>> default_database, t_order, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1,
>> 1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps,
>> orderInformationId, userId, categoryId, productId, price, productCount,
>> priceSum, shipAddress, receiverAddress])
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
>> at
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>> at scala.collection.immutable.Range.foreach(Range.scala:155)
>> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
>> at
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>>
>> 我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。
>> 那面对我这样的情况,该用什么方案来解决?
>> 望知道的各位告知一下,感谢!
>>
>> 祝好
>>
>>


Re: FlinkSQL CDC 窗口分组聚合求助

2020-11-22 文章 Jark Wu
Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。
你可以使用非 window 聚合来代替。

Btw,你可能说一下你的需求场景么? 为什么需要在  CDC 上做 window 操作呢?

Best,
Jark

On Mon, 23 Nov 2020 at 10:28, jy l  wrote:

> Hi:
> 我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下:
> [image: image.png]
> [image: image.png]
> 分组计算的SQL如下:
> [image: image.png]
> 在执行计算时,报了如下异常:
> Exception in thread "main" org.apache.flink.table.api.TableException:
> GroupWindowAggregate doesn't support consuming update and delete changes
> which is produced by node TableSourceScan(table=[[default_catalog,
> default_database, t_order, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1,
> 1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps,
> orderInformationId, userId, categoryId, productId, price, productCount,
> priceSum, shipAddress, receiverAddress])
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> at scala.collection.immutable.Range.foreach(Range.scala:155)
> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>
> 我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。
> 那面对我这样的情况,该用什么方案来解决?
> 望知道的各位告知一下,感谢!
>
> 祝好
>
>


Re: FlinkSQL 支持DDL时 补全字段并进行默认值设置吗?

2020-11-20 文章 Jark Wu
你说的补全字段是指什么?有没有具体的例子?自动推导 schema 么?

Best,
Jark

On Fri, 20 Nov 2020 at 17:09, 孟小鹏  wrote:

> 目前遇到一个痛点 FlinkSQL可以在DDL时 补全字段并设置默认值吗?这样少了去处理ETL的步骤
>
> 这快有考虑吗?


Re: Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 Jark Wu
1. 没有初始的全量数据可能是会有问题的。

3. 你的 format 再解析 update 时,时先发的 before 还是 after?
4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不?

On Fri, 20 Nov 2020 at 12:46, kandy.wang  wrote:

>
>
>
>
>
>
> 1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
>
> 2. 没有开启
>
>
>
>
> 在 2020-11-20 11:49:44,"Jark Wu"  写道:
> >实现上应该没什么问题。
> >
> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
> >2. 是否开启 mini-batch了?
> >
> >Best,
> >Jark
> >
> >On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:
> >
> >> hi Jark:
> >>
> >>
> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
> >>
> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条
> update_before
> >> update_after,format逻辑是应该这么写的吧。
> >>
> >>
> >>
> >>
> >> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
> >> >值的,以验证你的自定义 format 没有问题。
> >> >
> >> >Best,
> >> >Jark
> >> >
> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
> >> >
> >> >> --mysql表
> >> >> CREATE TABLE IF NOT EXISTS
> `mysql_realtime_leaving_price_spu_index_agg`(
> >> >>`id` INT UNSIGNED AUTO_INCREMENT,
> >> >>`spu_id` BIGINT NOT NULL,
> >> >>`leaving_price`  DECIMAL(10, 5)
> >> >>PRIMARY KEY ( `id` ),
> >> >>unique key idx_spu_id (spu_id)
> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
> >> >>
> >> >>
> >> >> --flink表
> >> >> CREATE TABLE
> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> >> (
> >> >>`spu_id` BIGINT ,
> >> >>`leaving_price`  DECIMAL(10, 5),
> >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
> >> >> ) WITH (
> >> >>   'connector' = 'jdbc',
> >> >>'url' = 'jdbc:mysql://...',
> >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
> >> >>'username' = '...',
> >> >>'password' = '..'
> >> >> );
> >> >>
> >> >>
> >> >> --binlog 2mysql
> >> >>
> >> >> insert into
> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> >> >>
> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
> >> >>
> >> >> FROM hive.database.table
> >> >>
> >> >> group by v_spu_id;
> >> >>
> >> >>
> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
> >> >>
> >> >>
> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
> >> >> 有什么好的排查思路么?
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >>
>


Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 Jark Wu
实现上应该没什么问题。

1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
2. 是否开启 mini-batch了?

Best,
Jark

On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:

> hi Jark:
>
>
> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>
> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before
> update_after,format逻辑是应该这么写的吧。
>
>
>
>
> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
> >值的,以验证你的自定义 format 没有问题。
> >
> >Best,
> >Jark
> >
> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
> >
> >> --mysql表
> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
> >>`id` INT UNSIGNED AUTO_INCREMENT,
> >>`spu_id` BIGINT NOT NULL,
> >>`leaving_price`  DECIMAL(10, 5)
> >>PRIMARY KEY ( `id` ),
> >>unique key idx_spu_id (spu_id)
> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
> >>
> >>
> >> --flink表
> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> (
> >>`spu_id` BIGINT ,
> >>`leaving_price`  DECIMAL(10, 5),
> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
> >> ) WITH (
> >>   'connector' = 'jdbc',
> >>'url' = 'jdbc:mysql://...',
> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
> >>'username' = '...',
> >>'password' = '..'
> >> );
> >>
> >>
> >> --binlog 2mysql
> >>
> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> >>
> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
> >>
> >> FROM hive.database.table
> >>
> >> group by v_spu_id;
> >>
> >>
> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
> >>
> >>
> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
> >> 有什么好的排查思路么?
> >>
> >>
> >>
> >>
> >>
> >>
>


Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 Jark Wu
你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
值的,以验证你的自定义 format 没有问题。

Best,
Jark

On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:

> --mysql表
> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>`id` INT UNSIGNED AUTO_INCREMENT,
>`spu_id` BIGINT NOT NULL,
>`leaving_price`  DECIMAL(10, 5)
>PRIMARY KEY ( `id` ),
>unique key idx_spu_id (spu_id)
> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>
>
> --flink表
> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>`spu_id` BIGINT ,
>`leaving_price`  DECIMAL(10, 5),
> PRIMARY KEY ( `spu_id`) NOT ENFORCED
> ) WITH (
>   'connector' = 'jdbc',
>'url' = 'jdbc:mysql://...',
>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>'username' = '...',
>'password' = '..'
> );
>
>
> --binlog 2mysql
>
> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>
> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>
> FROM hive.database.table
>
> group by v_spu_id;
>
>
> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>
>
> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
> 有什么好的排查思路么?
>
>
>
>
>
>


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-18 文章 Jark Wu
如果数据本身没什么倾斜,且并发也能打上去。那在 sql 这边也没什么其他办法了。得从 rocksdb 的角度去调优看看。比如:
1. 是否有使用 SSD?
2. 调整 write buffer 和 block cache
3. 更多可以看下这些 state 调优文章[1][2].

Best,
Jark

[1]: https://mp.weixin.qq.com/s/r0iPPGWceWkT1OeBJjvJGg
[2]: https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA

On Thu, 19 Nov 2020 at 12:19, jindy_liu <286729...@qq.com> wrote:

> 很感谢jark!
> 1、昨天将status表设置成时态表(Temporal
> Tables),然后连续join试了下。确实有你说的问题,status表的更新不会触发任务计算,所有的数据实时变更需要test流来驱动。
>
> 同时时态表TTL设置问题,太小i/o有问题,太大结果不及时,与应用场景要求不符合,主要我们的场景下,status表也并不是维表,并且也数据量也大,变化也多。
>
> 2、并发度1的话,可以预见的是有主要性能问题,表大的情况下,join导致的反压厉害。
>
> 3、因为多并发度(10,20,40,80)测试中,我将join的两个表(test,
> status)的数据完全对称,没有倾斜问题(200w,200w,并且join的key对称,test 1 -- status1, test 2 --
> status2, test 200 -- status200),source仍然存在着反压,只是并发度高的反压慢点出现一些,
> 这里的flink state后端用的是rokcsdb+本地文件。磁盘i/o看也就在2w block/s的,难道是默认的rokcsdb配置性能不够?
>
> 这个数据反压上,jark你有啥建议吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-18 文章 Jark Wu
我再仔细看了下你的问题,你的 join key 是 status id,所以目前会按照 status id 做 shuffle key 分发给
join 的不同并发处理。
如果 test 表的 status id 发生变更的话,就会导致一个 test  id 的数据会被不同的 join 并发处理,也即 test
数据已经乱序了,
这时候,即使下游再加 keyby sink key,也无济于事了。

所以,如果双流 join 两个 cdc 流,要注意 join key 是不能发生变更的,否则只能 join 设置成单并发。
像你这个场景,可以考虑采用维表 join status 表,因为目前维表 join 不会按照 join key 做 shuffle,所以能保证即使
test 表数据不乱序。
但是 status 表的更新,就无法触发计算 更新到sink 表了,只有 test 表的更新 才会触发计算并更新到 sink 表。

Best,
Jark



On Mon, 16 Nov 2020 at 16:03, jindy_liu <286729...@qq.com> wrote:

> 1、试了下
>
> 在test表中增加一个proctime
>
> CREATE TABLE test (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> `proctime` AS PROCTIME(),
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>   'connector' = 'mysql-cdc',
>   'hostname' = 'localhost',
>   'port' = '3306',
>   'username' = 'no_lock',
>   'password' = 'no_lock',
>   'database-name' = 'ai_audio_lyric_task',
>   'table-name' = 'test',
>   'debezium.snapshot.locking.mode' = 'none'
> );
>
> 写去重语句,
>
> INSERT into test_status_print
> SELECT r.id, r.name, r.`time`, r.`proctime`, r.status, r.status_name
> FROM (
> SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY proctime) as
> rowNum
> FROM (
> SELECT t.* , s.name as status_name
> FROM test AS t
> LEFT JOIN status AS s ON t.status = s.id
> )
> )r WHERE rowNum = 1;
>
> 但提示报错,不支持:
>
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Deduplicate doesn't support
> consuming update and delete changes which is produced by node
> Join(joinType=[LeftOuterJoin], where=[(status = id0)], select=[id, name,
> time, status, proctime, id0, name0], leftInputSpec=[HasUniqueKey],
> rightInputSpec=[JoinKeyContainsUniqueKey])
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink cdc 多表关联处理延迟很大

2020-11-17 文章 Jark Wu
估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。
注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。
https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ

解决办法文中也有提及:

解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:

execution.checkpointing.interval: 10min   # checkpoint间隔时间
execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint
失败容忍次数
restart-strategy: fixed-delay  # 重试策略
restart-strategy.fixed-delay.attempts: 2147483647   # 重试次数

Best,
Jark

On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <18579099...@163.com> wrote:

> 即使我将not
> exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。
> 下面是截图,(我上传图片每次都看不了啥情况)
> https://imgchr.com/i/DeqixU
> https://imgchr.com/i/DeqP2T
>
> > 在 2020年11月16日,上午10:29,Jark Wu  写道:
> >
> > 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。
> > 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。
> >
> > Best,
> > Jark
> >
> > On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <18579099...@163.com> wrote:
> >
> >> select
> >>ri.sub_clazz_number,
> >>prcrs.rounds,
> >>count(*) as num
> >> from
> >>subclazz gs
> >> JOIN
> >>(SELECT gce.number, min( gce.extension_value ) AS grade FROM
> >> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4
> GROUP
> >> BY gce.number) AS temp
> >> ON
> >>temp.number = gs.course_number AND temp.grade>30
> >> JOIN
> >>right_info ri
> >> ON
> >>gs.number = ri.sub_clazz_number
> >> join
> >>wide_subclazz ws
> >> on
> >>ws.number = ri.sub_clazz_number
> >> join
> >>course gc
> >> on
> >>gc.number = ws.course_number and gc.course_category_id in (30,40)
> >> left join
> >>performance_regular_can_renewal_sign prcrs
> >> on prcrs.order_number = ri.order_number andprcrs.rounds in (1,2)
> >> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null
> >> and not exists (select 1 from internal_staff gis where gis.user_id =
> >> ri.user_id)
> >> and not exists (select 1 from clazz_extension ce where ws.clazz_number =
> >> ce.number
> >>and ce.extension_type = 3 and ce.isdel = 0
> >>and ce.extension_value in (1,3,4,7,8,11))
> >> group by ri.sub_clazz_number, prcrs.rounds
> >> Sql代码是这样的。
> >> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。
> >>
> >>> 在 2020年11月14日,下午5:53,Jark Wu  写道:
> >>>
> >>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
> >>> 需要明确下,到底是什么节点慢了。
> >>>
> >>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <18579099...@163.com> wrote:
> >>>
> >>>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
> >>>> 有没有比较好的优化方案能缓解这样的问题?
> >>
> >>
> >>
>
>
>


Re: Flink cdc 多表关联处理延迟很大

2020-11-17 文章 Jark Wu
另外,join 节点的并发可以再增加一些,提升 join 的处理性能。

On Wed, 18 Nov 2020 at 11:34, Jark Wu  wrote:

> 估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。
> 注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。
> https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ
>
> 解决办法文中也有提及:
>
> 解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:
>
> execution.checkpointing.interval: 10min   # checkpoint间隔时间
> execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint
> 失败容忍次数
> restart-strategy: fixed-delay  # 重试策略
> restart-strategy.fixed-delay.attempts: 2147483647   # 重试次数
>
> Best,
> Jark
>
> On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <18579099...@163.com> wrote:
>
>> 即使我将not
>> exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。
>> 下面是截图,(我上传图片每次都看不了啥情况)
>> https://imgchr.com/i/DeqixU
>> https://imgchr.com/i/DeqP2T
>>
>> > 在 2020年11月16日,上午10:29,Jark Wu  写道:
>> >
>> > 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。
>> > 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。
>> >
>> > Best,
>> > Jark
>> >
>> > On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <18579099...@163.com> wrote:
>> >
>> >> select
>> >>ri.sub_clazz_number,
>> >>prcrs.rounds,
>> >>count(*) as num
>> >> from
>> >>subclazz gs
>> >> JOIN
>> >>(SELECT gce.number, min( gce.extension_value ) AS grade FROM
>> >> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4
>> GROUP
>> >> BY gce.number) AS temp
>> >> ON
>> >>temp.number = gs.course_number AND temp.grade>30
>> >> JOIN
>> >>right_info ri
>> >> ON
>> >>gs.number = ri.sub_clazz_number
>> >> join
>> >>wide_subclazz ws
>> >> on
>> >>ws.number = ri.sub_clazz_number
>> >> join
>> >>course gc
>> >> on
>> >>gc.number = ws.course_number and gc.course_category_id in (30,40)
>> >> left join
>> >>performance_regular_can_renewal_sign prcrs
>> >> on prcrs.order_number = ri.order_number andprcrs.rounds in (1,2)
>> >> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null
>> >> and not exists (select 1 from internal_staff gis where gis.user_id =
>> >> ri.user_id)
>> >> and not exists (select 1 from clazz_extension ce where ws.clazz_number
>> =
>> >> ce.number
>> >>and ce.extension_type = 3 and ce.isdel = 0
>> >>and ce.extension_value in (1,3,4,7,8,11))
>> >> group by ri.sub_clazz_number, prcrs.rounds
>> >> Sql代码是这样的。
>> >> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。
>> >>
>> >>> 在 2020年11月14日,下午5:53,Jark Wu  写道:
>> >>>
>> >>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
>> >>> 需要明确下,到底是什么节点慢了。
>> >>>
>> >>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <18579099...@163.com> wrote:
>> >>>
>> >>>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
>> >>>> 有没有比较好的优化方案能缓解这样的问题?
>> >>
>> >>
>> >>
>>
>>
>>


Re: flink1.11 TableEnvironment 不支持注册 Aggregate Function?

2020-11-17 文章 Jark Wu
通过 StreamTableEnvironmentImpl 构造函数直接构造一个 isStreamingMode = false
的 StreamTableEnvironmentImpl。
然后就可以在这个上面调用 registerFunction 了。

On Wed, 18 Nov 2020 at 10:40, lingchanhu  wrote:

> 非常感谢,如果flink1.11 目前不支持的话,那对于这种场景的使用有什么建议么?想要批处理数据,其中又要用到自定义的agg function?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.11 TableEnvironment 不支持注册 Aggregate Function?

2020-11-17 文章 Jark Wu
Btw, 1.12 版本 TableEnvironment#createTemporarySystemFunction 接口支持
AggregateFunction了。

On Wed, 18 Nov 2020 at 10:34, Jark Wu  wrote:

> 1.11 版本上 TableEnvironment#createTemporarySystemFunction 接口暂时还不支持
> AggregateFunction。
> 你说 StreamTableEnvironment 可以,我估计你用的是
> StreamTableEnvironment#registerFunction, 这个是支持 AggregateFunction 的。
>
> Best,
> Jark
>
>
> On Wed, 18 Nov 2020 at 09:49, lingchanhu  wrote:
>
>> *flink1.11*
>> 在TableEnvironment环境中注册并使用自定义的Aggregate
>> Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment
>> 注册和使用则是正常,这应该说明自定义的函数是ok的)
>>
>> org.apache.flink.table.api.TableException: Aggregate functions are not
>> updated to the new type system yet.
>> at
>>
>> org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:152)
>> at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:183)
>> at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:112)
>> at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
>> at
>>
>> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>> at
>>
>> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>> at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
>> at
>> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
>> at
>>
>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>> at
>> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>> at
>>
>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>> at
>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>> at
>> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>> at
>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>> at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
>> at
>>
>> org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
>> at java.util.function.Function.lambda$andThen$1(Function.java:88)
>> at
>>
>> org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
>> at
>>
>> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.getAggregate(OperationTreeBuilder.java:651)
>> at
>>
>> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:616)
>> at
>>
>> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:598)
>> at
>>
>> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>> at
>>
>> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>> at
>>
>> org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:511)
>> at
>>
>> org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
>> at
>>
>> com.ideacom.flink.demo.example.BatchTableExample.demo(BatchTableExample.java:48)
>> at com.ideacom.flink.demo.TableSqlJob.main(TableSqlJob.java:36)
>>
>> *// 以下是代码*
>> // main
>> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
>> .useBlinkPlanner()
>> .inBatchMode()
>> .build();
>>
>> TableEnvironment tEnv = TableEnvironment.create(envSettings);
>>
>> // 注册source table, jdbc table source
>> tEnv.executeSql("CREATE TABLE wx_event_log () with
>> ('connect.type'='jdbc'),");
>>
>> // 注册sink table,csv table sink
>> tEnv.executeSql("CREATE TABLE wx_data_statistics () with
>> ('connect.type'='

Re: flink1.11 TableEnvironment 不支持注册 Aggregate Function?

2020-11-17 文章 Jark Wu
1.11 版本上 TableEnvironment#createTemporarySystemFunction 接口暂时还不支持
AggregateFunction。
你说 StreamTableEnvironment 可以,我估计你用的是
StreamTableEnvironment#registerFunction, 这个是支持 AggregateFunction 的。

Best,
Jark


On Wed, 18 Nov 2020 at 09:49, lingchanhu  wrote:

> *flink1.11*
> 在TableEnvironment环境中注册并使用自定义的Aggregate
> Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment
> 注册和使用则是正常,这应该说明自定义的函数是ok的)
>
> org.apache.flink.table.api.TableException: Aggregate functions are not
> updated to the new type system yet.
> at
>
> org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:152)
> at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:183)
> at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:112)
> at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
> at
>
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
> at
>
> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
> at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
> at
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
> at
>
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at
>
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
> at
>
> org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
> at java.util.function.Function.lambda$andThen$1(Function.java:88)
> at
>
> org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
> at
>
> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.getAggregate(OperationTreeBuilder.java:651)
> at
>
> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:616)
> at
>
> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:598)
> at
>
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
> at
>
> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
> at
>
> org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:511)
> at
>
> org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
> at
>
> com.ideacom.flink.demo.example.BatchTableExample.demo(BatchTableExample.java:48)
> at com.ideacom.flink.demo.TableSqlJob.main(TableSqlJob.java:36)
>
> *// 以下是代码*
> // main
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inBatchMode()
> .build();
>
> TableEnvironment tEnv = TableEnvironment.create(envSettings);
>
> // 注册source table, jdbc table source
> tEnv.executeSql("CREATE TABLE wx_event_log () with
> ('connect.type'='jdbc'),");
>
> // 注册sink table,csv table sink
> tEnv.executeSql("CREATE TABLE wx_data_statistics () with
> ('connect.type'='filesystem','format.type'='csv',.)");
>
> // 注册agg function
> tEnv.createTemporarySystemFunction("firSendMsgFunc",new
> FirstSendMsgFunc());
>
> Table table2 = tEnv.sqlQuery("select from_user,create_time from
> wx_event_log
> where msg_type='text' and create_time between '2020-03-20' and
> '2020-03-21'");
>
> table2.groupBy($("from_user"))
>
>
> .aggregate(call("firSendMsgFunc",$("create_time")).as("first_send_msg_today"))
> .select($("from_user"),$("first_send_msg_today"))
> .executeInsert("wx_data_statistics");
>
>
> // 自定义agg function类
> public class FirstSendMsgFunc extends
> AggregateFunction {
>
> public void accumulate(CountDTO acc, LocalDateTime createTime) {
> if (acc.getDateTime() == null) {
> acc.setDateTime(createTime);
> } else if (acc.getDateTime().isAfter(createTime)) {
> 

Re: 请教这种数据格式怎么配置event time呢

2020-11-17 文章 Jark Wu
报什么错?


On Tue, 17 Nov 2020 at 23:43, 赵一旦  wrote:

> CREATE TABLE user_log
> (
> d MAP,
> process_time AS PROCTIME(),
> event_time AS
> TO_TIMESTAMP(FROM_UNIXTIME(COALESCE(d['server_time'], 0) / 1000)),
> WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
> ) WITH (
> 'connector' = 'kafka',
>
>  ...
>
> );
>
> 如上,报错。貌似不支持这么玩。但是我的数据格式就是这样的,比如:
>
> {
>
>   "d": {
>
> "name": "abc",
>
> "age": 12
>
>   }
>
> }
>


Re: Flink JSON反序列化DECIMAL精度丢失

2020-11-17 文章 Jark Wu
我们在 issue 中继续讨论吧。如在 issue中建议的,建议可以试试新版 connector 改成 'connector' = 'kafka'
试试。

On Mon, 16 Nov 2020 at 18:39, Luna Wong  wrote:

> https://issues.apache.org/jira/browse/FLINK-20170
>
> 这是我今天提的issue。
> Jackson这样反序列化会把BigDECIMAL转成Double。超过12位的小数精度丢失。这种情况我得怎么办。只能先当做STRING类型处理或修改下JSON这个包的源码重新变一下。
> 还有其他的最佳实践吗
>


Re: 使用flink1.11.1的debezium-changelog目前是否不支持Watermark

2020-11-16 文章 Jark Wu
是的。 目前还不支持。
1.12 版本会支持。
你定义 watermark 目的是什么呢?做 window 聚合?

On Tue, 17 Nov 2020 at 10:53, shimin huang 
wrote:

> 报错日志:
> ```
> Currently, defining WATERMARK on a changelog source is not supported
> ```
>


  1   2   3   4   5   >