UpsertStreamTableSink requires that Table has a full primary keys if it is updated.
Hi: 我自定义了一个UpsertStreamTableSink,在接入cdc数据向我的这个Sink写数据的时候,抛出了下面的异常: Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) 我在构建tableSchema的时候,已经设置了primary key了,但依旧抛出这个错误。 我的flink版本是flink-1.12.0的。 请教一下,这个问题,该怎么解决? 祝好! automths
Re: 回复:flink使用hive作为维表,kafka作为数据源,join时候报错
写on了,刚发现描述的问题没有写全 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 回复:flink使用hive作为维表,kafka作为数据源,join时候报错
sorry , 描述没写全,是有on的 -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink ????ceph
Hi ALL?? ??flink ceph ceph Best, Luke Yan
Flink etl 的应用场景
大家好: 方向:ETL 除了延迟上的区别,离线能实现的,flink 实时实现不了的应用场景有哪些或者有缺陷的点?
回复:flink使用hive作为维表,kafka作为数据源,join时候报错
你看异常信息,提示时态表join的时候需要主键,但是你没有定义。而且你join的时候不需要on吗? | | allanqinjy | | allanqi...@163.com | 签名由网易邮箱大师定制 在2021年05月14日 09:32,hehuiyuan 写道: select FROM jdqTableSources AS a JOIN tmmmp FOR SYSTEM_TIME AS OF a.proctime AS b Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Temporal Table Join requires primary key in versioned table, but no primary key can be found. The physical plan is: FlinkLogicalJoin(condition=[AND(=($0, $4), __INITIAL_TEMPORAL_JOIN_CONDITION($3, __TEMPORAL_JOIN_LEFT_KEY($0), __TEMPORAL_JOIN_RIGHT_KEY($4)))], joinType=[inner]) FlinkLogicalCalc(select=[opt, src, cur, PROCTIME() AS proctime]) FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, jdqTableSources]], fields=[mid, db, sch, tab, opt, ts, ddl, err, src, cur, cus]) FlinkLogicalSnapshot(period=[$cor0.proctime]) FlinkLogicalCalc(select=[item_sku_id, premium, cate_lev, type, borc]) FlinkLogicalTableSourceScan(table=[[myhive, dev, dev_brokenscreen_insurance_sku_info]], fields=[item_sku_id, item_sku_name, premium, cate_lev, type, borc, plan_code, subjection_b, product_name, lev_low_price, lev_upp_price, jd_price, shelves_tm, item_first_cate_name, item_second_cate_name, item_third_cate_name, sure_cate_lev, flag]) -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink使用hive作为维表,kafka作为数据源,join时候报错
select FROM jdqTableSources AS a JOIN tmmmp FOR SYSTEM_TIME AS OF a.proctime AS b Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Temporal Table Join requires primary key in versioned table, but no primary key can be found. The physical plan is: FlinkLogicalJoin(condition=[AND(=($0, $4), __INITIAL_TEMPORAL_JOIN_CONDITION($3, __TEMPORAL_JOIN_LEFT_KEY($0), __TEMPORAL_JOIN_RIGHT_KEY($4)))], joinType=[inner]) FlinkLogicalCalc(select=[opt, src, cur, PROCTIME() AS proctime]) FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, jdqTableSources]], fields=[mid, db, sch, tab, opt, ts, ddl, err, src, cur, cus]) FlinkLogicalSnapshot(period=[$cor0.proctime]) FlinkLogicalCalc(select=[item_sku_id, premium, cate_lev, type, borc]) FlinkLogicalTableSourceScan(table=[[myhive, dev, dev_brokenscreen_insurance_sku_info]], fields=[item_sku_id, item_sku_name, premium, cate_lev, type, borc, plan_code, subjection_b, product_name, lev_low_price, lev_upp_price, jd_price, shelves_tm, item_first_cate_name, item_second_cate_name, item_third_cate_name, sure_cate_lev, flag]) -- Sent from: http://apache-flink.147419.n8.nabble.com/
Mysql cdc 事件时间
你好,想问下mysql cdc做维度表left join的时候,能使用处理时间吗,我测试貌似只能使用事件时间 发自我的iPhone
请教flink cep如何对无序数据处理
兄弟们,想问下Flink的CEP能够对无序数据流进行处理匹配嘛? 我这里指的无序事件是:例如有两个事件,事件A和事件B,在一个时间窗口内,只要匹配到了A和B,不论A和B的到来顺序,我都认为是符合我的条件
FlinkCEP 尽可能多的匹配的问题
我有一个flinkCEP程序,采用eventTime,监控形如如下的数据 [13/May/2021:20:45:36 +0800] [13/May/2021:20:45:36 +0800] [13/May/2021:20:45:37 +0800] [13/May/2021:20:45:37 +0800] [13/May/2021:20:45:50 +0800] 程序中关键设置如下: 设置了水印延迟2s 跳过测略AfterMatchSkipStrategy.skipPastLastEvent() .times(3) .within(Time.seconds(3)); 结果得到如下结果: detected 3 access in 60s from same ip...[/45:36, /45:36, /45:37] 迟到输出的数据...[/45:37],发生超时的时间戳是::2021-05-13 08:45:40 其实我想得到结果是: 在[13/May/2021:20:45:50 +0800]这条数据到来时,我想得到这样的结果:detected 3 access in 60s from same ip...[/45:36, /45:36, /45:37, /45:37] ;因为他们都满足我.times(3).within(Time.seconds(3))的设置; 所以我应该怎样做? -- Sent from: http://apache-flink.147419.n8.nabble.com/
类型转换问题 String 类型如何转 decimal类型
source 端接收到的数据类型为 String, sink 端 MySQL 数据库字段类型定义为 decimal(12, 2) , 在编写 insert 语句时该如何把 String 类型转换为 decimal, 尝试了 flink build-in 的 cast 并不行,请问各位有什么好的方法? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re:回复:flink sql写hbase问题
不是,原因找到了,是函数多次嵌套导致,flink原始类型是not null,不能转换为string,这个报错信息真的是蛋疼,让人迷惑 在 2021-05-13 10:09:49,"allanqinjy" 写道: >光看异常,应该是你插入了空值吧,你插入hbase的时候做个filter过滤吧,比如你的rowkey空了,你往hbase插入应该是不行的。你可以试试。 > > >| | >allanqinjy >| >| >allanqi...@163.com >| >签名由网易邮箱大师定制 > > >在2021年05月12日 19:23,酷酷的浑蛋 写道: >Mismatch of function's argument data type 'STRING NOT NULL' and actual >argument type 'STRING'.sql有些长,大概就是在执行 insert hbase sql时 报了上面的错误,请问这种错误是什么原因?
flink sql怎样将change log stream转换成append log stream?
flink sql怎样将change log stream转换成append log stream? 通过cdc接入了change log stream,后面就不能使用窗口聚合了,像Tumble和Hop, Session窗口。只能通过state ttl + group by timestamp这种方式聚合。 问一下有没有办法将change log stream转换成append log stream,从而可以使用上述窗口聚合了呢?谢谢!
Re: Flink 1.11版本LeaseRenewer线程不释放
并没有定位到具体原因,只能靠重启作业缓解。。。 zhisheng 于2021年5月13日周四 下午4:20写道: > 你好,这个问题后来定位到问题了吗? > > 我们生产也有一个作业有这样的问题,Flink 版本是 1.10.0,这个作业是 JM 的线程数很多(快 6k),作业是 flink 读取 > Kafka,会关联 HBase ,开启了 Checkpoint,就这个作业有问题,很奇怪 > > https://tva1.sinaimg.cn/large/008i3skNgy1gqgvhdu674j31je0u0795.jpg > > zilong xiao 于2020年12月8日周二 下午6:21写道: > > > 作业数据流是 kafka -> flink -> > > http/prometheus,目前这类型的作业很多,但是就只有那几个有问题,而且是必现,每次都只能重启,然后看着线程数上涨。。 > 我再debug看看~ > > > > Paul Lam 于2020年12月8日周二 下午6:00写道: > > > > > Hi, > > > > > > 我之前说的多个集群的情况主要指写入数据到 HDFS。如果只有 checkpoint 依赖 HDFS 而出现这种情况的话,的确是非常奇怪。 > > > > > > Best, > > > Paul Lam > > > > > > > 2020年12月8日 11:03,zilong xiao 写道: > > > > > > > > Hi Paul, > > > >线程名称是一模一样的,都是user1@cluserA,HDFS client版本对于用户来说是透明的,作业使用的是Flink > > > > > > > > > > 1.11版本,该Flink版本使用HDFS版本好像是2.8.1,在Flink中和集群有持续交互的就只能想到checkpoint,开了DEBUG日志也没能找到root > > > > cause。。 > > > > > > > >另外 您说的“线程个数应该和用到的 HDFS 集群数目相同”不是很理解,作业只能提交到一个具体的集群吧? > > > > > > > > Paul Lam 于2020年12月8日周二 上午10:45写道: > > > > > > > >> 我记得 LeaseRenewer 是 JVM 级别的,线程个数应该和用到的 HDFS 集群数目相同。 > > > >> > > > >> 你看看它们具体的线程名是不是完全相同(比如都是 user1@cluserA)?还有 HDFS client 的版本是什么? > > > >> > > > >> Best, > > > >> Paul Lam > > > >> > > > >>> 2020年12月7日 18:11,zilong xiao 写道: > > > >>> > > > >>> 在生产中发现有个别Flink SQL 1.11作业的container线程数很高,查看Thread > > > Dump发现有很多名为LeaseRenewer > > > >>> 的线程处于TIMED_WAITING状态,目前只能复现其现象,但是无法定位原因,不知道社区是否有类似经历的小伙伴呢? > > > >>> > > > >>> Flink version: 1.11 > > > >>> State backend:filesystem > > > >>> checkpoint interval: 60s > > > >> > > > >> > > > > > > > > >
?????? Flink 1.11????LeaseRenewer??????????
Flink java ?? ---- ??: "zhisheng"https://tva1.sinaimg.cn/large/008i3skNgy1gqgvhdu674j31je0u0795.jpg zilong xiao
Re: Flink 1.11版本LeaseRenewer线程不释放
你好,这个问题后来定位到问题了吗? 我们生产也有一个作业有这样的问题,Flink 版本是 1.10.0,这个作业是 JM 的线程数很多(快 6k),作业是 flink 读取 Kafka,会关联 HBase ,开启了 Checkpoint,就这个作业有问题,很奇怪 https://tva1.sinaimg.cn/large/008i3skNgy1gqgvhdu674j31je0u0795.jpg zilong xiao 于2020年12月8日周二 下午6:21写道: > 作业数据流是 kafka -> flink -> > http/prometheus,目前这类型的作业很多,但是就只有那几个有问题,而且是必现,每次都只能重启,然后看着线程数上涨。。 我再debug看看~ > > Paul Lam 于2020年12月8日周二 下午6:00写道: > > > Hi, > > > > 我之前说的多个集群的情况主要指写入数据到 HDFS。如果只有 checkpoint 依赖 HDFS 而出现这种情况的话,的确是非常奇怪。 > > > > Best, > > Paul Lam > > > > > 2020年12月8日 11:03,zilong xiao 写道: > > > > > > Hi Paul, > > >线程名称是一模一样的,都是user1@cluserA,HDFS client版本对于用户来说是透明的,作业使用的是Flink > > > > > > 1.11版本,该Flink版本使用HDFS版本好像是2.8.1,在Flink中和集群有持续交互的就只能想到checkpoint,开了DEBUG日志也没能找到root > > > cause。。 > > > > > >另外 您说的“线程个数应该和用到的 HDFS 集群数目相同”不是很理解,作业只能提交到一个具体的集群吧? > > > > > > Paul Lam 于2020年12月8日周二 上午10:45写道: > > > > > >> 我记得 LeaseRenewer 是 JVM 级别的,线程个数应该和用到的 HDFS 集群数目相同。 > > >> > > >> 你看看它们具体的线程名是不是完全相同(比如都是 user1@cluserA)?还有 HDFS client 的版本是什么? > > >> > > >> Best, > > >> Paul Lam > > >> > > >>> 2020年12月7日 18:11,zilong xiao 写道: > > >>> > > >>> 在生产中发现有个别Flink SQL 1.11作业的container线程数很高,查看Thread > > Dump发现有很多名为LeaseRenewer > > >>> 的线程处于TIMED_WAITING状态,目前只能复现其现象,但是无法定位原因,不知道社区是否有类似经历的小伙伴呢? > > >>> > > >>> Flink version: 1.11 > > >>> State backend:filesystem > > >>> checkpoint interval: 60s > > >> > > >> > > > > >
?????? flink on k8s native ????????
Hi Yang?? flink session on k8s ??examples application ?? flink run-application \ --target kubernetes-application \ -Dkubernetes.namespace=flink-session-cluster \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=flink:latest \ local:///opt/flink/examples/batch/WordCount.jar deployment ?? service pod?? describe pods ?? ?? configmap flink-config-my-first-application-cluster?? ?? kubectl describe ??flink-config-my-first-application-cluster ?? describe pods ?? Name: my-first-application-cluster-6c85b474-64t6x Namespace: flink-session-cluster Priority: 0 Node:hkctttl104/10.199.252.104 Start Time: Thu, 13 May 2021 22:16:03 +0800 Labels: app=my-first-application-cluster component=jobmanager pod-template-hash=6c85b474 type=flink-native-kubernetes Annotations: https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#rbac; permissions to create, delete pods ?? kubectl create namespace flink-session-cluster kubectl create serviceaccount flink -n flink-session-cluster kubectl create clusterrolebinding flink-role-binding-flink \ --clusterrole =edit \ --serviceaccount=flink-session-cluster:flink session ?? flink 3.1 session ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster \ -Dkubernetes.namespace=flink-session-cluster \ -Dkubernetes.jobmanager.service-account=flink \ -Dkubernetes.service.exposed.type=NodePort ?? 3.2 flink ?? ./bin/flink run \ --target kubernetes-session \ -Dkubernetes.namespace=flink-session-cluster \ -Dkubernetes.cluster-id=my-first-flink-cluster \ -Dkubernetes.jobmanager.service-account=flink \ ./examples/streaming/TopSpeedWindowing.jar ?? Executing TopSpeedWindowing example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/home/dsi/soft/flink/flink-1.12.2/lib/flink-dist_2.11-1.12.2.jar) to field java.lang.String.value WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2021-05-12 15:51:30,453 INFO org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Retrieve flink cluster my-first-flink-cluster successfully, JobManager Web Interface: http://10.199.252.101:8081 The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute job 'CarTopSpeedWindowingExample'. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'CarTopSpeedWindowingExample'. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782) at org.apache.flink.streaming.examples.windowing.TopSpeedWindowing.main(TopSpeedWindowing.java:99) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at