UpsertStreamTableSink requires that Table has a full primary keys if it is updated.

2021-05-13 文章 automths
Hi:

我自定义了一个UpsertStreamTableSink,在接入cdc数据向我的这个Sink写数据的时候,抛出了下面的异常:
Exception in thread "main" org.apache.flink.table.api.TableException: 
UpsertStreamTableSink requires that Table has a full primary keys if it is 
updated.
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)


我在构建tableSchema的时候,已经设置了primary key了,但依旧抛出这个错误。
我的flink版本是flink-1.12.0的。


请教一下,这个问题,该怎么解决?




祝好!
automths





Re: 回复:flink使用hive作为维表,kafka作为数据源,join时候报错

2021-05-13 文章 hehuiyuan
写on了,刚发现描述的问题没有写全



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复:flink使用hive作为维表,kafka作为数据源,join时候报错

2021-05-13 文章 hehuiyuan
sorry , 描述没写全,是有on的



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink ????ceph

2021-05-13 文章 ????????????????
Hi ALL??


??flink  ceph  ceph 




Best,


Luke Yan

Flink etl 的应用场景

2021-05-13 文章 孙啸龙
大家好:
   方向:ETL
   除了延迟上的区别,离线能实现的,flink 实时实现不了的应用场景有哪些或者有缺陷的点?

回复:flink使用hive作为维表,kafka作为数据源,join时候报错

2021-05-13 文章 allanqinjy
你看异常信息,提示时态表join的时候需要主键,但是你没有定义。而且你join的时候不需要on吗?


| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制


在2021年05月14日 09:32,hehuiyuan 写道:
select 
FROM  jdqTableSources AS a
JOIN tmmmp FOR SYSTEM_TIME AS OF a.proctime AS b



Caused by: org.apache.flink.client.program.ProgramInvocationException: The
main method caused an error: Temporal Table Join requires primary key in
versioned table, but no primary key can be found. The physical plan is:
FlinkLogicalJoin(condition=[AND(=($0, $4),
__INITIAL_TEMPORAL_JOIN_CONDITION($3, __TEMPORAL_JOIN_LEFT_KEY($0),
__TEMPORAL_JOIN_RIGHT_KEY($4)))], joinType=[inner])
FlinkLogicalCalc(select=[opt, src, cur, PROCTIME() AS proctime])
FlinkLogicalTableSourceScan(table=[[default_catalog, default_database,
jdqTableSources]], fields=[mid, db, sch, tab, opt, ts, ddl, err, src, cur,
cus])
FlinkLogicalSnapshot(period=[$cor0.proctime])
FlinkLogicalCalc(select=[item_sku_id, premium, cate_lev, type, borc])
FlinkLogicalTableSourceScan(table=[[myhive, dev,
dev_brokenscreen_insurance_sku_info]], fields=[item_sku_id, item_sku_name,
premium, cate_lev, type, borc, plan_code, subjection_b, product_name,
lev_low_price, lev_upp_price, jd_price, shelves_tm, item_first_cate_name,
item_second_cate_name, item_third_cate_name, sure_cate_lev, flag])



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink使用hive作为维表,kafka作为数据源,join时候报错

2021-05-13 文章 hehuiyuan
select 
FROM  jdqTableSources AS a
JOIN tmmmp FOR SYSTEM_TIME AS OF a.proctime AS b



Caused by: org.apache.flink.client.program.ProgramInvocationException: The
main method caused an error: Temporal Table Join requires primary key in
versioned table, but no primary key can be found. The physical plan is:
FlinkLogicalJoin(condition=[AND(=($0, $4),
__INITIAL_TEMPORAL_JOIN_CONDITION($3, __TEMPORAL_JOIN_LEFT_KEY($0),
__TEMPORAL_JOIN_RIGHT_KEY($4)))], joinType=[inner])
  FlinkLogicalCalc(select=[opt, src, cur, PROCTIME() AS proctime])
FlinkLogicalTableSourceScan(table=[[default_catalog, default_database,
jdqTableSources]], fields=[mid, db, sch, tab, opt, ts, ddl, err, src, cur,
cus])
  FlinkLogicalSnapshot(period=[$cor0.proctime])
FlinkLogicalCalc(select=[item_sku_id, premium, cate_lev, type, borc])
  FlinkLogicalTableSourceScan(table=[[myhive, dev,
dev_brokenscreen_insurance_sku_info]], fields=[item_sku_id, item_sku_name,
premium, cate_lev, type, borc, plan_code, subjection_b, product_name,
lev_low_price, lev_upp_price, jd_price, shelves_tm, item_first_cate_name,
item_second_cate_name, item_third_cate_name, sure_cate_lev, flag])



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Mysql cdc 事件时间

2021-05-13 文章 流弊
你好,想问下mysql cdc做维度表left join的时候,能使用处理时间吗,我测试貌似只能使用事件时间



发自我的iPhone

请教flink cep如何对无序数据处理

2021-05-13 文章 sherlock zw
兄弟们,想问下Flink的CEP能够对无序数据流进行处理匹配嘛?
我这里指的无序事件是:例如有两个事件,事件A和事件B,在一个时间窗口内,只要匹配到了A和B,不论A和B的到来顺序,我都认为是符合我的条件


FlinkCEP 尽可能多的匹配的问题

2021-05-13 文章 lp
我有一个flinkCEP程序,采用eventTime,监控形如如下的数据
[13/May/2021:20:45:36 +0800]
[13/May/2021:20:45:36 +0800]
[13/May/2021:20:45:37 +0800]
[13/May/2021:20:45:37 +0800]
[13/May/2021:20:45:50 +0800]



程序中关键设置如下:
设置了水印延迟2s
跳过测略AfterMatchSkipStrategy.skipPastLastEvent()

.times(3)
.within(Time.seconds(3));



结果得到如下结果:
detected 3 access in 60s from same ip...[/45:36, /45:36, /45:37]
迟到输出的数据...[/45:37],发生超时的时间戳是::2021-05-13 08:45:40


其实我想得到结果是:
在[13/May/2021:20:45:50 +0800]这条数据到来时,我想得到这样的结果:detected 3 access in 60s from
same ip...[/45:36, /45:36, /45:37, /45:37]
;因为他们都满足我.times(3).within(Time.seconds(3))的设置;

所以我应该怎样做?





--
Sent from: http://apache-flink.147419.n8.nabble.com/


类型转换问题 String 类型如何转 decimal类型

2021-05-13 文章 WeiXubin
source 端接收到的数据类型为 String,  sink 端 MySQL 数据库字段类型定义为 decimal(12, 2)  , 在编写
insert 语句时该如何把 String 类型转换为 decimal, 尝试了 flink build-in 的 cast
并不行,请问各位有什么好的方法?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:回复:flink sql写hbase问题

2021-05-13 文章 酷酷的浑蛋
不是,原因找到了,是函数多次嵌套导致,flink原始类型是not null,不能转换为string,这个报错信息真的是蛋疼,让人迷惑

















在 2021-05-13 10:09:49,"allanqinjy"  写道:
>光看异常,应该是你插入了空值吧,你插入hbase的时候做个filter过滤吧,比如你的rowkey空了,你往hbase插入应该是不行的。你可以试试。
>
>
>| |
>allanqinjy
>|
>|
>allanqi...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2021年05月12日 19:23,酷酷的浑蛋 写道:
>Mismatch of function's argument data type 'STRING NOT NULL' and actual 
>argument type 'STRING'.sql有些长,大概就是在执行  insert hbase sql时 报了上面的错误,请问这种错误是什么原因?


flink sql怎样将change log stream转换成append log stream?

2021-05-13 文章 casel.chen
flink sql怎样将change log stream转换成append log stream?
通过cdc接入了change log stream,后面就不能使用窗口聚合了,像Tumble和Hop, Session窗口。只能通过state ttl + 
group by timestamp这种方式聚合。
问一下有没有办法将change log stream转换成append log stream,从而可以使用上述窗口聚合了呢?谢谢!

Re: Flink 1.11版本LeaseRenewer线程不释放

2021-05-13 文章 zilong xiao
并没有定位到具体原因,只能靠重启作业缓解。。。

zhisheng  于2021年5月13日周四 下午4:20写道:

> 你好,这个问题后来定位到问题了吗?
>
> 我们生产也有一个作业有这样的问题,Flink 版本是 1.10.0,这个作业是 JM 的线程数很多(快 6k),作业是 flink 读取
> Kafka,会关联 HBase ,开启了 Checkpoint,就这个作业有问题,很奇怪
>
> https://tva1.sinaimg.cn/large/008i3skNgy1gqgvhdu674j31je0u0795.jpg
>
> zilong xiao  于2020年12月8日周二 下午6:21写道:
>
> > 作业数据流是 kafka -> flink ->
> > http/prometheus,目前这类型的作业很多,但是就只有那几个有问题,而且是必现,每次都只能重启,然后看着线程数上涨。。
> 我再debug看看~
> >
> > Paul Lam  于2020年12月8日周二 下午6:00写道:
> >
> > > Hi,
> > >
> > > 我之前说的多个集群的情况主要指写入数据到 HDFS。如果只有 checkpoint 依赖 HDFS 而出现这种情况的话,的确是非常奇怪。
> > >
> > > Best,
> > > Paul Lam
> > >
> > > > 2020年12月8日 11:03,zilong xiao  写道:
> > > >
> > > > Hi Paul,
> > > >线程名称是一模一样的,都是user1@cluserA,HDFS client版本对于用户来说是透明的,作业使用的是Flink
> > > >
> > >
> >
> 1.11版本,该Flink版本使用HDFS版本好像是2.8.1,在Flink中和集群有持续交互的就只能想到checkpoint,开了DEBUG日志也没能找到root
> > > > cause。。
> > > >
> > > >另外 您说的“线程个数应该和用到的 HDFS 集群数目相同”不是很理解,作业只能提交到一个具体的集群吧?
> > > >
> > > > Paul Lam  于2020年12月8日周二 上午10:45写道:
> > > >
> > > >> 我记得 LeaseRenewer 是 JVM 级别的,线程个数应该和用到的 HDFS 集群数目相同。
> > > >>
> > > >> 你看看它们具体的线程名是不是完全相同(比如都是 user1@cluserA)?还有 HDFS client 的版本是什么?
> > > >>
> > > >> Best,
> > > >> Paul Lam
> > > >>
> > > >>> 2020年12月7日 18:11,zilong xiao  写道:
> > > >>>
> > > >>> 在生产中发现有个别Flink SQL 1.11作业的container线程数很高,查看Thread
> > > Dump发现有很多名为LeaseRenewer
> > > >>> 的线程处于TIMED_WAITING状态,目前只能复现其现象,但是无法定位原因,不知道社区是否有类似经历的小伙伴呢?
> > > >>>
> > > >>> Flink version: 1.11
> > > >>> State backend:filesystem
> > > >>> checkpoint interval: 60s
> > > >>
> > > >>
> > >
> > >
> >
>


?????? Flink 1.11????LeaseRenewer??????????

2021-05-13 文章 5599
Flink java ??




----
??: "zhisheng"https://tva1.sinaimg.cn/large/008i3skNgy1gqgvhdu674j31je0u0795.jpg

zilong xiao 

Re: Flink 1.11版本LeaseRenewer线程不释放

2021-05-13 文章 zhisheng
你好,这个问题后来定位到问题了吗?

我们生产也有一个作业有这样的问题,Flink 版本是 1.10.0,这个作业是 JM 的线程数很多(快 6k),作业是 flink 读取
Kafka,会关联 HBase ,开启了 Checkpoint,就这个作业有问题,很奇怪

https://tva1.sinaimg.cn/large/008i3skNgy1gqgvhdu674j31je0u0795.jpg

zilong xiao  于2020年12月8日周二 下午6:21写道:

> 作业数据流是 kafka -> flink ->
> http/prometheus,目前这类型的作业很多,但是就只有那几个有问题,而且是必现,每次都只能重启,然后看着线程数上涨。。 我再debug看看~
>
> Paul Lam  于2020年12月8日周二 下午6:00写道:
>
> > Hi,
> >
> > 我之前说的多个集群的情况主要指写入数据到 HDFS。如果只有 checkpoint 依赖 HDFS 而出现这种情况的话,的确是非常奇怪。
> >
> > Best,
> > Paul Lam
> >
> > > 2020年12月8日 11:03,zilong xiao  写道:
> > >
> > > Hi Paul,
> > >线程名称是一模一样的,都是user1@cluserA,HDFS client版本对于用户来说是透明的,作业使用的是Flink
> > >
> >
> 1.11版本,该Flink版本使用HDFS版本好像是2.8.1,在Flink中和集群有持续交互的就只能想到checkpoint,开了DEBUG日志也没能找到root
> > > cause。。
> > >
> > >另外 您说的“线程个数应该和用到的 HDFS 集群数目相同”不是很理解,作业只能提交到一个具体的集群吧?
> > >
> > > Paul Lam  于2020年12月8日周二 上午10:45写道:
> > >
> > >> 我记得 LeaseRenewer 是 JVM 级别的,线程个数应该和用到的 HDFS 集群数目相同。
> > >>
> > >> 你看看它们具体的线程名是不是完全相同(比如都是 user1@cluserA)?还有 HDFS client 的版本是什么?
> > >>
> > >> Best,
> > >> Paul Lam
> > >>
> > >>> 2020年12月7日 18:11,zilong xiao  写道:
> > >>>
> > >>> 在生产中发现有个别Flink SQL 1.11作业的container线程数很高,查看Thread
> > Dump发现有很多名为LeaseRenewer
> > >>> 的线程处于TIMED_WAITING状态,目前只能复现其现象,但是无法定位原因,不知道社区是否有类似经历的小伙伴呢?
> > >>>
> > >>> Flink version: 1.11
> > >>> State backend:filesystem
> > >>> checkpoint interval: 60s
> > >>
> > >>
> >
> >
>


?????? flink on k8s native ????????

2021-05-13 文章 ????????????????
Hi Yang??


flink session on k8s ??examples  
application ??








flink run-application \
--target kubernetes-application \
-Dkubernetes.namespace=flink-session-cluster \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image=flink:latest \
local:///opt/flink/examples/batch/WordCount.jar



 deployment  ??



service 





pod??





 describe pods ??





?? configmap 
flink-config-my-first-application-cluster?? ?? kubectl 
describe ??flink-config-my-first-application-cluster





?? describe pods ??


Name:
my-first-application-cluster-6c85b474-64t6x
Namespace:  flink-session-cluster
Priority:  0
Node:hkctttl104/10.199.252.104
Start Time: Thu, 13 May 2021 22:16:03 +0800
Labels:   app=my-first-application-cluster
   component=jobmanager
   pod-template-hash=6c85b474
   type=flink-native-kubernetes
Annotations: https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#rbac;
 permissions
 to create, delete pods  
??

 kubectl create namespace flink-session-cluster

 kubectl create serviceaccount flink -n flink-session-cluster

 kubectl create clusterrolebinding flink-role-binding-flink \ --clusterrole
 =edit \ --serviceaccount=flink-session-cluster:flink


  session ?? flink 

 3.1 session 

 ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster
 \
  -Dkubernetes.namespace=flink-session-cluster \
  -Dkubernetes.jobmanager.service-account=flink \
  -Dkubernetes.service.exposed.type=NodePort

 ??


 3.2 flink 

 ??
 ./bin/flink run \
  --target kubernetes-session \
  -Dkubernetes.namespace=flink-session-cluster \
  
-Dkubernetes.cluster-id=my-first-flink-cluster \
  -Dkubernetes.jobmanager.service-account=flink \
  ./examples/streaming/TopSpeedWindowing.jar

 ??

 Executing TopSpeedWindowing example with default input data set.
 Use --input to specify file input.
 Printing result to stdout. Use --output to specify output path.
 WARNING: An illegal reflective access operation has occurred
 WARNING: Illegal reflective access by
 org.apache.flink.api.java.ClosureCleaner
 (file:/home/dsi/soft/flink/flink-1.12.2/lib/flink-dist_2.11-1.12.2.jar) to
 field java.lang.String.value
 WARNING: Please consider reporting this to the maintainers of
 org.apache.flink.api.java.ClosureCleaner
 WARNING: Use --illegal-access=warn to enable warnings of further illegal
 reflective access operations
 WARNING: All illegal access operations will be denied in a future release
 2021-05-12 15:51:30,453 INFO
 
org.apache.flink.kubernetes.KubernetesClusterDescriptor
 [] - Retrieve
 flink cluster my-first-flink-cluster successfully, JobManager Web
 Interface: http://10.199.252.101:8081

 
 The program finished with the following exception:

 org.apache.flink.client.program.ProgramInvocationException: The main
 method caused an error: Failed to execute job 
'CarTopSpeedWindowingExample'.
 at
 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
 at
 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
 at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
 at
 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
 at
 org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
 at
 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
 at
 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
 Caused by: org.apache.flink.util.FlinkException: Failed to execute job
 'CarTopSpeedWindowingExample'.
 at
 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918)
 at
 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
 at
 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
 at
 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
 at
 
org.apache.flink.streaming.examples.windowing.TopSpeedWindowing.main(TopSpeedWindowing.java:99)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
 Method)
 at
 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at