standalone K8S 如何查看 TaskMananger 的 gc.log ?

2021-06-18 Thread WeiXubin
请问 *standalone K8S* 部署模式为 *Deploy Application Cluster* 在哪获取查看/怎么配置 TaskMananger 的 *gc.log* 日志? -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re:FlinkSQL join 维表后一定会变成 upsert流吗?

2021-06-10 Thread WeiXubin
感谢你的回答,我这边看了官网目前 join 一共可以分为 Regular Joins 、 Interval Joins 以及 Temporal Joins 三大类。 我上面问题所述的确是采用了 Regular Joins 的方式。 之后我也尝试使用了 Lookup Join 但发现其最后也是转为 INSERT INTO ON DUPLICATE KEY UPDATE 的执行语句, 并不是我所期望的纯 append 模式 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 关于 LEFT JOIN DimTable FOR SYSTEM_TIME AS OF 的异常求教!

2021-06-10 Thread WeiXubin
已解决,问题在于 Kafka 不是直接 join 维表,而是先和 UDTF join,之后整体才与维表 Join。 所以之前起别名的位置有误。导致找不到字段、改造如下: 先前写法: INSERT INTO sinktable select // 省略字段 from KafkaTable,LATERAL TABLE(RequestBodyColumnToRow(message, 'uid,game_id(BIGINT),platform' )) *as k* LEFT JOIN DimTable FOR SYSTEM_TIME AS OF k.record_time as d

关于 LEFT JOIN DimTable FOR SYSTEM_TIME AS OF 的异常求教!

2021-06-10 Thread WeiXubin
异常:column 'record_time' not found in table 'k' 异常描述:KafkaTable k 表在与维表进行 look up join 时定义了别名,之后报在 k 表中没有定义 record_time 字段。 Flink 版本: 1.12.2 // Source 表 CREATE TABLE KafkaTable ( message STRING, record_time TIMESTAMP(3) METADATA FROM 'timestamp' ) WITH ( 'connector' = 'kafka', ); //

FlinkSQL join 维表后一定会变成 upsert流吗?

2021-06-09 Thread WeiXubin
请教各位一下,我使用 FlinkSQL 编写任务时,kafka source -> MySQL sink 不设置主键,查看了一下 request mode 是 [INSERT] ,也就是普通的 append 流,这很正常。 但是当我关联上维表后,发现 request mode 变成了 [INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE],这时异常报错会要求我给 sink 表设置主键,当我设置上主键后就会变成了 upsert 流。 upsert流底层实现原理是 INSERT INTO ... DUPLICATE KEY

Re: FlinkSQL cannot update pk column UID to expr

2021-06-08 Thread WeiXubin
我把表移动到了普通 MYSQL,可以正常运行。 经过排查应该是 ADB 建表 SQL 中的 DISTRIBUTE BY HASH(`uid`) 所导致,该语法用于处理数据倾斜问题。 看起来似乎是联表 join 的时候要求定义主键,但是定义主键后会转换为 upsert 流,而 ADB 中定义了 DISTRIBUTE BY 与 upsert 冲突了,不知道是否这么理解 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: FlinkSQL cannot update pk column UID to expr

2021-06-08 Thread WeiXubin
详细的异常打印信息如下: java.sql.BatchUpdateException: [3, 2021060816420017201616500303151172306] cannot update pk column UID to expr at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at

FlinkSQL cannot update pk column UID to expr

2021-06-08 Thread WeiXubin
基础场景: 从 KafkaSource 输入数据,输出到 sinktable, 期间 Left join 关联 DimTable 维表。 Flink 版本 1.12.2 场景1:当把 sinktable 设置为 'connector' = 'print' ,不设置任何主键,可正常关联输出 场景2:当把 sinktable 设置为 'connector' = 'mysql' 则会要求加上 primary key 场景3:在 sinktable 加上 PRIMARY KEY (uid, platform,root_game_id) NOT ENFORCED 则报错,主要报错信息:

Re: 关于 flinksql 维表的问题

2021-06-02 Thread WeiXubin
你好,可以麻烦详细描述一下吗? 谢谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink SQL 1.11.3问题请教

2021-06-02 Thread WeiXubin
rity); collect(row); 具体使用可参考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/ Best,Weixubin -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 关于 flinksql 维表的问题

2021-05-28 Thread WeiXubin
感谢各位的建议 -- Sent from: http://apache-flink.147419.n8.nabble.com/

关于 flinksql 维表的问题

2021-05-22 Thread WeiXubin
我想实现将MySQL中的 A 表数据预先查询出来进行缓存,用于给流表 B 进行 join关联。接下来定时查询并更新 A 表内的缓存数据,请问目前 FlinkSQL 可以实现吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/

类型转换问题 String 类型如何转 decimal类型

2021-05-13 Thread WeiXubin
source 端接收到的数据类型为 String, sink 端 MySQL 数据库字段类型定义为 decimal(12, 2) , 在编写 insert 语句时该如何把 String 类型转换为 decimal, 尝试了 flink build-in 的 cast 并不行,请问各位有什么好的方法? -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: kafka table connector eventTime的问题

2020-11-04 Thread WeiXubin
Hi,我这有一个使用Datastream开发简单例子,接收Kafka(Event Time)数据并进行开窗聚合。Kafka数据格式如:{"word":"a","count":1,"time":1604286564},可以看看该Demo对你是否有所帮助。 public class MyExample { public static void main(String[] args) throws Exception { // 创建环境 StreamExecutionEnvironment env =

Re: Flink1.10 sink to mysql SocketException

2020-10-09 Thread WeiXubin
Hi, 你可以看下这个 Jira , https://issues.apache.org/jira/browse/FLINK-16681,似乎与你问题相关。 由于长时间没有数据导致 connection 断开问题,该问题已经在1.11版本修复。 Best, Weixubin -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 关于Flink1.11 CSV Format的一些疑问

2020-08-08 Thread WeiXubin
感谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/

关于Flink1.11 CSV Format的一些疑问

2020-08-07 Thread WeiXubin
Hi,我在Flink1.11版本,使用filesystem connector的时候,读取csv文件并输出到另外一个csv文件遇到了些问题,问题如下: 问题1:sink 的path指定具体输出文件名,但是输出的结果是 文件夹形式 问题2:在flink1.11的文档中没有找到csv的 ignore-first-line 忽略第一行这个配置 测试数据 11101322000220200517145507667060666706;9 11101412000220200515163257249700624970;9 11101412010220200514163709315410631541;9

Re: flink1.11查询结果每秒入库到mysql数量很少

2020-07-24 Thread WeiXubin
Hi, 你可以尝试改写url,加上rewritebatchedstatements=true,如下: jdbc:mysql://198.2.2.71:3306/bda?useSSL=false=true MySQL Jdbc驱动在默认情况下会无视executeBatch()语句,把期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,直接造成较低的性能。把rewriteBatchedStatements参数置为true, 驱动才会帮你批量执行SQL。 祝好 weixubin -- Sent from: http://apache-flink.147419.n8

Re: Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-23 Thread WeiXubin
感谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-23 Thread Weixubin
Hi, 我想请教下,使用streamExecutionEnv.execute("from kafka sink hbase") 是可以指定Job的名称。 而当改用streamTableEnv.executeSql(sql)的方式时,似乎无法定义Job的名称。 请问有什么解决的方法吗? 在 2020-07-08 16:07:17,"Jingsong Li" 写道: >Hi, > >你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。 > >所以你后面

Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-23 Thread WeiXubin
Hi, 我想请问下使用 streamExecutionEnv.execute("from kafka sink hbase"),通过这种方式可以给Job指定名称。 而当使用streamTableEnv.executeSql(sql)之后似乎无法给Job定义名称。 请问有什么解决方案吗?谢谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re: FlinkSQL 是否支持类似临时中间表的概念

2020-06-24 Thread Weixubin
equest_uri)) as T(...) >) t >group by TUMBLE(ts, INTERVAL '30' SECOND) > >祝好, >Leonard > > >> 在 2020年6月24日,12:09,Weixubin <18925434...@163.com> 写道: >> >> 第一个数据处理过程就是将 request_uri 解析为多个属性(多列),存成一行,作为一条记录。 >

Re:Re: FlinkSQL 是否支持类似临时中间表的概念

2020-06-23 Thread Weixubin
L TABLE(ParseUriRow(request_uri)) as T( )….` >的这段sql是可以复用的,就和 VIEW的作用类似。 > >如果你不需要中间结果表,只是要最终的结果表,那你写个嵌套的sql就行了,里层是`select * from sourceTable , LATERAL >TABLE(ParseUriRow(request_uri)) as T( )….·,外层是 group by, 插入最终的结果表就能满足需求了吧。 > >祝好, >Leonard Xu > > >>

Re:Re: FlinkSQL 是否支持类似临时中间表的概念

2020-06-23 Thread Weixubin
Hi, 关于这句 “把 ` select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….` 这段 sql insert 到中间结果表 和 group后再写入最终结果表就可以了” 我不太明白所说的中间结果表是什么意思, 我所理解为在数据库创建多一张middleSinkTable表,作为中间结果表。请问这样理解是否有误? 可否简单举个例子。 Thanks, Bin 在 2020-06-23 11:57:28,"Leonard Xu" 写道: >Hi, >是的,这个是在

Re:Re: FlinkSQL 是否支持类似临时中间表的概念

2020-06-22 Thread Weixubin
感谢,我查阅了下资料后发现CREATE VIEW这个语法是在Flink.1.12有提及而1.10版本没有 ,1.12版本暂未发行, 而我目前使用的版本是1.10版本。 而且很奇怪,我并没有找到1.11版本的文档 在 2020-06-23 10:58:25,"Leonard Xu" 写道: >Hi, > >> 在 2020年6月23日,10:49,Weixubin <18925434...@163.com> 写道: >> >> //这个时候我希望能够创建一张临时中间表 t

FlinkSQL 是否支持类似临时中间表的概念

2020-06-22 Thread Weixubin
Hi, 我希望通过FlinkSQL的方式在一个Job中完成两步的操作,但似乎办不到,情况大致如下: eg.有一个ETL过程,需要从Source获取数据--将每条数据拆分为一条多列数据--对拆分完的数据开窗聚合--输出到sink。 //从Source获取数据 CREATE TABLE sourceTable ( request_uri STRING ) WITH ( .. ); //这个时候我希望能够创建一张临时中间表 tempTable用来存放 对Source表中数据拆分为多列后的结果,类似于下面这样(Flink 并不支持这么做) CREATE

Re:flink 通过 --classpath 传入https-xxx.xxx.jar 在1.8上正常运行 在flink1.10 就会报传入的jar包找不到

2020-06-22 Thread Weixubin
和版本应该没什么关系。如果是多节点部署的情况下,-C 所指定的URL 需要各个节点都能访问得到。 确认下该URL能被所有节点访问到吗 Best, Bin At 2020-06-22 11:43:11, "程龙" <13162790...@163.com> wrote: >2020-06-22 10:16:34,379 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed >(6/6)

Re:Re: 回复: 关于拓展 Tuple元组的问题

2020-06-19 Thread Weixubin
, Types.STRING ); } } 在 2020-06-19 15:46:42,"Jark Wu" 写道: >用 Row 啊,支持 Int.MAX 个元素,还能支持 null 值,不香么? > > >On Fri, 19 Jun 2020 at 15:42, Weixubin <18925434...@163.com> wrote: > >> 感谢你的回答,请问可否举一个参照例子? >> >> >>

Re:回复: 关于拓展 Tuple元组的问题

2020-06-19 Thread Weixubin
感谢你的回答,请问可否举一个参照例子? 在 2020-06-19 15:31:53,"wangweigu...@stevegame.cn" 写道: > > 多个值组合在一起,当一个复合值使用! > > > > >发件人: 魏旭斌 >发送时间: 2020-06-19 15:01 >收件人: user-zh >主题: 关于拓展 Tuple元组的问题 >目前Flink 提供了Tuple1 ~ Tuple25,在实际开发中不能满足我们的需求,我们希望能有更大的元组,比如构建一个Tuple50。 >请问有什么解决的方案? 谢谢