Re:Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 文章 chenxuying
hi
ok,谢谢,懂了哈哈














在 2020-07-31 21:27:02,"Leonard Xu"  写道:
>Hello
>
>> 在 2020年7月31日,21:13,chenxuying  写道:
>> 
>> 但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做
>
>简单来讲,如果使用的是老版本(1.10)的option参数,代码执行的路径就和1.10版本一样的,1.10版本里是不支持定义 PRIMARY KEY 的,
>是通过用户的query来决定写入的模式是upsert 还是 append ,  你可以看下1.10的文档关于用query 推导 写入模式的文档[1], 
>如果已经在用1.11了,1.10的文档可以不用看的。
> 
>在1.10里经常出现query 推导不出 key 导致无法做upsert写入的case, 在1.11里通过支持定义 PRIMARY 
>KEY,不会再有类似问题.1.11的文档参考[2]。
>
>祝好
>Leonard
>
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector
> 
>
>[2] 
>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
> 
>


Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-07-31 文章 Hequn Cheng
Hi Jincheng,

Thanks a lot for raising the discussion. +1 for the FLIP.

I think this will bring big benefits for the PyFlink users. Currently, the
Python TableAPI document is hidden deeply under the TableAPI tab which
makes it quite unreadable. Also, the PyFlink documentation is mixed with
Java/Scala documentation. It is hard for users to have an overview of all
the PyFlink documents. As more and more functionalities are added into
PyFlink, I think it's time for us to refactor the document.

Best,
Hequn


On Fri, Jul 31, 2020 at 3:43 PM Marta Paes Moreira 
wrote:

> Hi, Jincheng!
>
> Thanks for creating this detailed FLIP, it will make a big difference in
> the experience of Python developers using Flink. I'm interested in
> contributing to this work, so I'll reach out to you offline!
>
> Also, thanks for sharing some information on the adoption of PyFlink, it's
> great to see that there are already production users.
>
> Marta
>
> On Fri, Jul 31, 2020 at 5:35 AM Xingbo Huang  wrote:
>
> > Hi Jincheng,
> >
> > Thanks a lot for bringing up this discussion and the proposal.
> >
> > Big +1 for improving the structure of PyFlink doc.
> >
> > It will be very friendly to give PyFlink users a unified entrance to
> learn
> > PyFlink documents.
> >
> > Best,
> > Xingbo
> >
> > Dian Fu  于2020年7月31日周五 上午11:00写道:
> >
> >> Hi Jincheng,
> >>
> >> Thanks a lot for bringing up this discussion and the proposal. +1 to
> >> improve the Python API doc.
> >>
> >> I have received many feedbacks from PyFlink beginners about
> >> the PyFlink doc, e.g. the materials are too few, the Python doc is mixed
> >> with the Java doc and it's not easy to find the docs he wants to know.
> >>
> >> I think it would greatly improve the user experience if we can have one
> >> place which includes most knowledges PyFlink users should know.
> >>
> >> Regards,
> >> Dian
> >>
> >> 在 2020年7月31日,上午10:14,jincheng sun  写道:
> >>
> >> Hi folks,
> >>
> >> Since the release of Flink 1.11, users of PyFlink have continued to
> grow.
> >> As far as I know there are many companies have used PyFlink for data
> >> analysis, operation and maintenance monitoring business has been put
> into
> >> production(Such as 聚美优品[1](Jumei),  浙江墨芷[2] (Mozhi) etc.).  According to
> >> the feedback we received, current documentation is not very friendly to
> >> PyFlink users. There are two shortcomings:
> >>
> >> - Python related content is mixed in the Java/Scala documentation, which
> >> makes it difficult for users who only focus on PyFlink to read.
> >> - There is already a "Python Table API" section in the Table API
> document
> >> to store PyFlink documents, but the number of articles is small and the
> >> content is fragmented. It is difficult for beginners to learn from it.
> >>
> >> In addition, FLIP-130 introduced the Python DataStream API. Many
> >> documents will be added for those new APIs. In order to increase the
> >> readability and maintainability of the PyFlink document, Wei Zhong and
> me
> >> have discussed offline and would like to rework it via this FLIP.
> >>
> >> We will rework the document around the following three objectives:
> >>
> >> - Add a separate section for Python API under the "Application
> >> Development" section.
> >> - Restructure current Python documentation to a brand new structure to
> >> ensure complete content and friendly to beginners.
> >> - Improve the documents shared by Python/Java/Scala to make it more
> >> friendly to Python users and without affecting Java/Scala users.
> >>
> >> More detail can be found in the FLIP-133:
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-133%3A+Rework+PyFlink+Documentation
> >>
> >> Best,
> >> Jincheng
> >>
> >> [1] https://mp.weixin.qq.com/s/zVsBIs1ZEFe4atYUYtZpRg
> >> [2] https://mp.weixin.qq.com/s/R4p_a2TWGpESBWr3pLtM2g
> >>
> >>
> >>
>


Behavior for flink job running on K8S failed after restart strategy exhausted

2020-07-31 文章 Eleanore Jin
Hi Experts,

I have a flink cluster (per job mode) running on kubernetes. The job is
configured with restart strategy

restart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 10 s


So after 3 times retry, the job will be marked as FAILED, hence the pods
are not running. However, kubernetes will then restart the job again as the
available replicas do not match the desired one.

I wonder what are the suggestions for such a scenario? How should I
configure the flink job running on k8s?

Thanks a lot!
Eleanore


Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 文章 Leonard Xu
Hello

> 在 2020年7月31日,21:13,chenxuying  写道:
> 
> 但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做

简单来讲,如果使用的是老版本(1.10)的option参数,代码执行的路径就和1.10版本一样的,1.10版本里是不支持定义 PRIMARY KEY 的,
是通过用户的query来决定写入的模式是upsert 还是 append ,  你可以看下1.10的文档关于用query 推导 写入模式的文档[1], 
如果已经在用1.11了,1.10的文档可以不用看的。
 
在1.10里经常出现query 推导不出 key 导致无法做upsert写入的case, 在1.11里通过支持定义 PRIMARY 
KEY,不会再有类似问题.1.11的文档参考[2]。

祝好
Leonard

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector
 

[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
 


Re:Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 文章 chenxuying
谢谢回答
使用新属性可以 成功修改记录 ,
但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做

















在 2020-07-31 16:46:41,"Leonard Xu"  写道:
>Hi, chenxuying
>
>看你还是用的还是 "  'connector.type' = 'jdbc', ….  " 
>,这是老的option,使用老的option参数还是需要根据query推导主键,
>需要使用新的属性[1]:" 'connector' = 'jdbc’,…." 才能配合 主键 决定 upsert 模式.
> 
>Best
>Leonard
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options
> 
>
>
>> 在 2020年7月31日,16:12,chenxuying  写道:
>> 
>> hi
>> 我使用的flink 1.11.0版本
>> 代码如下
>> StreamExecutionEnvironment streamEnv = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv);
>> tableEnvironment.executeSql(" " +
>> " CREATE TABLE mySource ( " +
>> "  a bigint, " +
>> "  b bigint " +
>> " ) WITH ( " +
>> "  'connector.type' = 'kafka', " +
>> "  'connector.version' = 'universal', " +
>> "  'connector.topic' = 'mytesttopic', " +
>> "  'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " +
>> "  'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " +
>> "  'connector.properties.group.id' = 'flink-test-cxy', " +
>> "  'connector.startup-mode' = 'latest-offset', " +
>> "  'format.type' = 'json' " +
>> " ) ");
>> tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " +
>> " id bigint, " +
>> "  game_id varchar, " +
>> "  PRIMARY KEY (id) NOT ENFORCED  " +
>> " )  " +
>> " with ( " +
>> "  'connector.type' = 'jdbc',   " +
>> "  'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' 
>> , " +
>> "  'connector.username' = 'root' , " +
>> "  'connector.password' = 'root',  " +
>> "  'connector.table' = 'mysqlsink' , " +
>> "  'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " +
>> "  'connector.write.flush.interval' = '2s',  " +
>> "  'connector.write.flush.max-rows' = '300'  " +
>> " )");
>> tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values 
>> (select a,cast(b as varchar) b from mySource)");
>> 
>> 
>> 问题一 : 上面的insert语句会出现如下错误
>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot 
>> apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(> A, VARCHAR(2147483647) B)>)'. Supported form(s): 
>> '$SCALAR_QUERY()'
>> 
>> 
>> 问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select 
>> a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错
>> Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate 
>> entry '1' for key 'PRIMARY'
>> 
>> 
>> 
>


Re: Flink sql 转义字符问题

2020-07-31 文章 Leonard Xu
Hi, zilong

SPLIT_INDEX(${xxx}, ‘;’, 0)

 ‘;’ 分号不是特殊字符,编译时应该不会报错的,我在Flink 1.11.1 用DDL 测试了下, 能够work的,不知道你的环境是怎样的。
  U&'\003B'  是 ; 的 unicode编码,所以用这个unicode编码是可以的,但一般这种用法是在需要用不可见字符分割时我们这样使用,
  比如 \n 对应的s是 U&'\\000A’ ,\r 对应的是 U&'\\000D’, 对于分号这种可见字符来讲,不需要用unicode编码就可以的。

祝好
Leonard 

> 在 2020年7月31日,20:46,zilong xiao  写道:
> 
> U&'\003B'  这么写就可以了 感觉好奇怪啊。。
> 
> 李奇 <359502...@qq.com> 于2020年7月31日周五 下午8:25写道:
> 
>> 加反斜杠就可以。\;  只不过分号应该不是特殊字符吧。
>> 
>>> 在 2020年7月31日,下午8:13,zilong xiao  写道:
>>> 
>>> SPLIT_INDEX(${xxx}, ';',
>>> 
>> 0),想从字符串中按分号切割,可是分号应该是特殊字符,语法检查总是不能通过,网上查说是可以转义,但是也没太搞懂怎么才能转义,有遇到过类似问题的大佬求指点~~
>> 



回复: Flink sql 转义字符问题

2020-07-31 文章 Hannan Kan
我看官方文档https://help.aliyun.com/knowledge_detail/62544.html中接口是VARCHAR 
SPLIT_INDEX(VARCHAR str, VARCHAR sep, INT index)
sep 是字符串类型。是不是要用双引号或者看下分号是不是英文的?


--原始邮件--
发件人:
"user-zh"   
 


Re: Flink sql 转义字符问题

2020-07-31 文章 zilong xiao
U&'\003B'  这么写就可以了 感觉好奇怪啊。。

李奇 <359502...@qq.com> 于2020年7月31日周五 下午8:25写道:

> 加反斜杠就可以。\;  只不过分号应该不是特殊字符吧。
>
> > 在 2020年7月31日,下午8:13,zilong xiao  写道:
> >
> > SPLIT_INDEX(${xxx}, ';',
> >
> 0),想从字符串中按分号切割,可是分号应该是特殊字符,语法检查总是不能通过,网上查说是可以转义,但是也没太搞懂怎么才能转义,有遇到过类似问题的大佬求指点~~
>


Re: Flink sql 转义字符问题

2020-07-31 文章 zilong xiao
实测反斜杠好像也不行

李奇 <359502...@qq.com> 于2020年7月31日周五 下午8:25写道:

> 加反斜杠就可以。\;  只不过分号应该不是特殊字符吧。
>
> > 在 2020年7月31日,下午8:13,zilong xiao  写道:
> >
> > SPLIT_INDEX(${xxx}, ';',
> >
> 0),想从字符串中按分号切割,可是分号应该是特殊字符,语法检查总是不能通过,网上查说是可以转义,但是也没太搞懂怎么才能转义,有遇到过类似问题的大佬求指点~~
>


Re: Flink sql 转义字符问题

2020-07-31 文章 李奇
加反斜杠就可以。\;  只不过分号应该不是特殊字符吧。

> 在 2020年7月31日,下午8:13,zilong xiao  写道:
> 
> SPLIT_INDEX(${xxx}, ';',
> 0),想从字符串中按分号切割,可是分号应该是特殊字符,语法检查总是不能通过,网上查说是可以转义,但是也没太搞懂怎么才能转义,有遇到过类似问题的大佬求指点~~


Flink sql 转义字符问题

2020-07-31 文章 zilong xiao
SPLIT_INDEX(${xxx}, ';',
0),想从字符串中按分号切割,可是分号应该是特殊字符,语法检查总是不能通过,网上查说是可以转义,但是也没太搞懂怎么才能转义,有遇到过类似问题的大佬求指点~~


Re: Flink 1.11 submit job timed out

2020-07-31 文章 Matt Wang
遇到了同样的问题,也是启动了 taskmanager-query-state-service.yaml 这个服务后,作业才能正常提交的,另外我是在本地装的 
k8s 集群进行测试的,如果是 GC 的问题,启不启动 TM service 应该不会有影响的


--

Best,
Matt Wang


On 07/27/2020 15:01,Yang Wang wrote:
建议先配置heartbeat.timeout的值大一些,然后把gc log打出来
看看是不是经常发生fullGC,每次持续时间是多长,从你目前提供的log看,进程内JM->RM都会心跳超时
怀疑还是和GC有关的

env.java.opts.jobmanager: -Xloggc:/jobmanager-gc.log
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=2 -XX:GCLogFileSize=512M


Best,
Yang

SmileSmile  于2020年7月27日周一 下午1:50写道:

Hi,Yang Wang

因为日志太长了,删了一些重复的内容。
一开始怀疑过jm gc的问题,将jm的内存调整为10g也是一样的情况。

Best



| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

On 07/27/2020 11:36, Yang Wang wrote:
看你这个任务,失败的根本原因并不是“No hostname could be resolved
”,这个WARNING的原因可以单独讨论(如果在1.10里面不存在的话)。
你可以本地起一个Standalone的集群,也会有这样的WARNING,并不影响正常使用


失败的原因是slot 5分钟申请超时了,你给的日志里面2020-07-23 13:55:45,519到2020-07-23
13:58:18,037是空白的,没有进行省略吧?
这段时间按理应该是task开始deploy了。在日志里看到了JM->RM的心跳超时,同一个Pod里面的同一个进程通信也超时了
所以怀疑JM一直在FullGC,这个需要你确认一下


Best,
Yang

SmileSmile  于2020年7月23日周四 下午2:43写道:

Hi Yang Wang

先分享下我这边的环境版本


kubernetes:1.17.4.   CNI: weave


1 2 3 是我的一些疑惑

4 是JM日志


1. 去掉taskmanager-query-state-service.yaml后确实不行  nslookup

kubectl exec -it busybox2 -- /bin/sh
/ # nslookup 10.47.96.2
Server:  10.96.0.10
Address: 10.96.0.10:53

** server can't find 2.96.47.10.in-addr.arpa: NXDOMAIN



2. Flink1.11和Flink1.10

detail subtasks taskmanagers xxx x 这行

1.11变成了172-20-0-50。1.10是flink-taskmanager-7b5d6958b6-sfzlk:36459。这块的改动是?(目前这个集群跑着1.10和1.11,1.10可以正常运行,如果coredns有问题,1.10版本的flink应该也有一样的情况吧?)

3. coredns是否特殊配置?

在容器中解析域名是正常的,只是反向解析没有service才会有问题。coredns是否有什么需要配置?


4. time out时候的JM日志如下:



2020-07-23 13:53:00,228 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
ResourceManager akka.tcp://flink@flink-jobmanager
:6123/user/rpc/resourcemanager_0
was granted leadership with fencing token

2020-07-23 13:53:00,232 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService [] -
Starting
RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher
at akka://flink/user/rpc/dispatcher_1 .
2020-07-23 13:53:00,233 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl []
-
Starting the SlotManager.
2020-07-23 13:53:03,472 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registering TaskManager with ResourceID 1f9ae0cd95a28943a73be26323588696
(akka.tcp://flink@10.34.128.9:6122/user/rpc/taskmanager_0) at
ResourceManager
2020-07-23 13:53:03,777 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registering TaskManager with ResourceID cac09e751264e61615329c20713a84b4
(akka.tcp://flink@10.32.160.6:6122/user/rpc/taskmanager_0) at
ResourceManager
2020-07-23 13:53:03,787 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registering TaskManager with ResourceID 93c72d01d09f9ae427c5fc980ed4c1e4
(akka.tcp://flink@10.39.0.8:6122/user/rpc/taskmanager_0) at
ResourceManager
2020-07-23 13:53:04,044 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registering TaskManager with ResourceID 8adf2f8e81b77a16d5418a9e252c61e2
(akka.tcp://flink@10.38.64.7:6122/user/rpc/taskmanager_0) at
ResourceManager
2020-07-23 13:53:04,099 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registering TaskManager with ResourceID 23e9d2358f6eb76b9ae718d879d4f330
(akka.tcp://flink@10.42.160.6:6122/user/rpc/taskmanager_0) at
ResourceManager
2020-07-23 13:53:04,146 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registering TaskManager with ResourceID 092f8dee299e32df13db3111662b61f8
(akka.tcp://flink@10.33.192.14:6122/user/rpc/taskmanager_0) at
ResourceManager


2020-07-23 13:55:44,220 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] -
Received
JobGraph submission 99a030d0e3f428490a501c0132f27a56 (JobTest).
2020-07-23 13:55:44,222 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] -
Submitting job 99a030d0e3f428490a501c0132f27a56 (JobTest).
2020-07-23 13:55:44,251 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService [] -
Starting
RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
akka://flink/user/rpc/jobmanager_2 .
2020-07-23 13:55:44,260 INFO
org.apache.flink.runtime.jobmaster.JobMaster
[] - Initializing job JobTest
(99a030d0e3f428490a501c0132f27a56).
2020-07-23 13:55:44,278 INFO
org.apache.flink.runtime.jobmaster.JobMaster
[] - Using restart back off time strategy
NoRestartBackoffTimeStrategy for JobTest
(99a030d0e3f428490a501c0132f27a56).
2020-07-23 13:55:44,319 INFO
org.apache.flink.runtime.jobmaster.JobMaster
[] - Running initialization on master for job JobTest
(99a030d0e3f428490a501c0132f27a56).
2020-07-23 13:55:44,319 INFO
org.apache.flink.runtime.jobmaster.JobMaster
[] - Successfully ran initialization on master in 0 ms.
2020-07-23 13:55:44,428 INFO

Re: RocksDBKeyedStateBackend如何写磁盘

2020-07-31 文章 jun su
hi,

看到 RocksDBWriteBatchWrapper类有 flushIfNeeded()方法 , 是这个么?

 private void flushIfNeeded() throws RocksDBException {
boolean needFlush = batch.count() == capacity || (batchSize > 0 &&
getDataSize() >= batchSize);
if (needFlush) {
flush();
}
}

batchSize 来自 state.backend.rocksdb.write-batch-size 参数的配置

jiafu <530496...@qq.com> 于2020年7月31日周五 下午4:41写道:

>
> writerbuffer写满会flush到磁盘,checkpoint启动的时候会有一次snapshot过程,会让rocksdb做checkpoint,然后将数据刷到磁盘形成sst文件。
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> sujun891...@gmail.com;
> 发送时间:2020年7月31日(星期五) 下午4:37
> 收件人:"user-zh"
> 主题:RocksDBKeyedStateBackend如何写磁盘
>
>
>
> hi all,
>
> 请问RocksDBKeyedStateBackend是何时将state序列化到磁盘的, 窗口结束时间?还是配置的checkpoint周期,谢谢
>
> --
> Best,
> Jun Su



-- 
Best,
Jun Su


Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 文章 Leonard Xu
Hi, chenxuying

看你还是用的还是 "  'connector.type' = 'jdbc', ….  " 
,这是老的option,使用老的option参数还是需要根据query推导主键,
需要使用新的属性[1]:" 'connector' = 'jdbc’,…." 才能配合 主键 决定 upsert 模式.
 
Best
Leonard
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options
 


> 在 2020年7月31日,16:12,chenxuying  写道:
> 
> hi
> 我使用的flink 1.11.0版本
> 代码如下
> StreamExecutionEnvironment streamEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv);
> tableEnvironment.executeSql(" " +
> " CREATE TABLE mySource ( " +
> "  a bigint, " +
> "  b bigint " +
> " ) WITH ( " +
> "  'connector.type' = 'kafka', " +
> "  'connector.version' = 'universal', " +
> "  'connector.topic' = 'mytesttopic', " +
> "  'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " +
> "  'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " +
> "  'connector.properties.group.id' = 'flink-test-cxy', " +
> "  'connector.startup-mode' = 'latest-offset', " +
> "  'format.type' = 'json' " +
> " ) ");
> tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " +
> " id bigint, " +
> "  game_id varchar, " +
> "  PRIMARY KEY (id) NOT ENFORCED  " +
> " )  " +
> " with ( " +
> "  'connector.type' = 'jdbc',   " +
> "  'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , 
> " +
> "  'connector.username' = 'root' , " +
> "  'connector.password' = 'root',  " +
> "  'connector.table' = 'mysqlsink' , " +
> "  'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " +
> "  'connector.write.flush.interval' = '2s',  " +
> "  'connector.write.flush.max-rows' = '300'  " +
> " )");
> tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values 
> (select a,cast(b as varchar) b from mySource)");
> 
> 
> 问题一 : 上面的insert语句会出现如下错误
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot 
> apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY( A, VARCHAR(2147483647) B)>)'. Supported form(s): 
> '$SCALAR_QUERY()'
> 
> 
> 问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select 
> a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错
> Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry 
> '1' for key 'PRIMARY'
> 
> 
> 



??????RocksDBKeyedStateBackend??????????

2020-07-31 文章 jiafu
writerbuffer??flushcheckpoint??snapshot??rocksdb??checkpointsst??




----
??: 
   "user-zh"



RocksDBKeyedStateBackend如何写磁盘

2020-07-31 文章 jun su
hi all,

请问RocksDBKeyedStateBackend是何时将state序列化到磁盘的, 窗口结束时间?还是配置的checkpoint周期,谢谢

-- 
Best,
Jun Su


Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 文章 李奇
改成update模式,然后也可以修改唯一主键为自然键

> 在 2020年7月31日,下午4:13,chenxuying  写道:
> 
> hi
> 我使用的flink 1.11.0版本
> 代码如下
> StreamExecutionEnvironment streamEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv);
> tableEnvironment.executeSql(" " +
> " CREATE TABLE mySource ( " +
> "  a bigint, " +
> "  b bigint " +
> " ) WITH ( " +
> "  'connector.type' = 'kafka', " +
> "  'connector.version' = 'universal', " +
> "  'connector.topic' = 'mytesttopic', " +
> "  'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " +
> "  'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " +
> "  'connector.properties.group.id' = 'flink-test-cxy', " +
> "  'connector.startup-mode' = 'latest-offset', " +
> "  'format.type' = 'json' " +
> " ) ");
> tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " +
> " id bigint, " +
> "  game_id varchar, " +
> "  PRIMARY KEY (id) NOT ENFORCED  " +
> " )  " +
> " with ( " +
> "  'connector.type' = 'jdbc',   " +
> "  'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , 
> " +
> "  'connector.username' = 'root' , " +
> "  'connector.password' = 'root',  " +
> "  'connector.table' = 'mysqlsink' , " +
> "  'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " +
> "  'connector.write.flush.interval' = '2s',  " +
> "  'connector.write.flush.max-rows' = '300'  " +
> " )");
> tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values 
> (select a,cast(b as varchar) b from mySource)");
> 
> 
> 问题一 : 上面的insert语句会出现如下错误
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot 
> apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY( A, VARCHAR(2147483647) B)>)'. Supported form(s): 
> '$SCALAR_QUERY()'
> 
> 
> 问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select 
> a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错
> Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry 
> '1' for key 'PRIMARY'
> 
> 
> 



flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 文章 chenxuying
hi
我使用的flink 1.11.0版本
代码如下
StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv);
tableEnvironment.executeSql(" " +
" CREATE TABLE mySource ( " +
"  a bigint, " +
"  b bigint " +
" ) WITH ( " +
"  'connector.type' = 'kafka', " +
"  'connector.version' = 'universal', " +
"  'connector.topic' = 'mytesttopic', " +
"  'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " +
"  'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " +
"  'connector.properties.group.id' = 'flink-test-cxy', " +
"  'connector.startup-mode' = 'latest-offset', " +
"  'format.type' = 'json' " +
" ) ");
tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " +
" id bigint, " +
"  game_id varchar, " +
"  PRIMARY KEY (id) NOT ENFORCED  " +
" )  " +
" with ( " +
"  'connector.type' = 'jdbc',   " +
"  'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " 
+
"  'connector.username' = 'root' , " +
"  'connector.password' = 'root',  " +
"  'connector.table' = 'mysqlsink' , " +
"  'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " +
"  'connector.write.flush.interval' = '2s',  " +
"  'connector.write.flush.max-rows' = '300'  " +
" )");
tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values 
(select a,cast(b as varchar) b from mySource)");


问题一 : 上面的insert语句会出现如下错误
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply 
'$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY()'. Supported form(s): '$SCALAR_QUERY()'


问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select 
a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错
Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry 
'1' for key 'PRIMARY'