请问 *standalone K8S* 部署模式为 *Deploy Application Cluster* 在哪获取查看/怎么配置
TaskMananger 的 *gc.log* 日志?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
感谢你的回答,我这边看了官网目前 join 一共可以分为 Regular Joins 、 Interval Joins 以及 Temporal Joins
三大类。 我上面问题所述的确是采用了 Regular Joins 的方式。 之后我也尝试使用了 Lookup Join 但发现其最后也是转为
INSERT INTO ON DUPLICATE KEY UPDATE 的执行语句, 并不是我所期望的纯 append 模式
--
Sent from: http://apache-flink.147419.n8.nabble.com/
已解决,问题在于 Kafka 不是直接 join 维表,而是先和 UDTF join,之后整体才与维表 Join。
所以之前起别名的位置有误。导致找不到字段、改造如下:
先前写法:
INSERT INTO sinktable select
// 省略字段
from KafkaTable,LATERAL TABLE(RequestBodyColumnToRow(message,
'uid,game_id(BIGINT),platform'
)) *as k* LEFT JOIN DimTable FOR SYSTEM_TIME AS OF k.record_time as d
异常:column 'record_time' not found in table 'k'
异常描述:KafkaTable k 表在与维表进行 look up join 时定义了别名,之后报在 k 表中没有定义 record_time 字段。
Flink 版本: 1.12.2
// Source 表
CREATE TABLE KafkaTable (
message STRING,
record_time TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
);
//
请教各位一下,我使用 FlinkSQL 编写任务时,kafka source -> MySQL sink 不设置主键,查看了一下 request
mode 是 [INSERT] ,也就是普通的 append 流,这很正常。
但是当我关联上维表后,发现 request mode 变成了 [INSERT, UPDATE_BEFORE, UPDATE_AFTER,
DELETE],这时异常报错会要求我给 sink 表设置主键,当我设置上主键后就会变成了 upsert 流。
upsert流底层实现原理是 INSERT INTO ... DUPLICATE KEY
我把表移动到了普通 MYSQL,可以正常运行。 经过排查应该是 ADB 建表 SQL 中的 DISTRIBUTE BY HASH(`uid`)
所导致,该语法用于处理数据倾斜问题。 看起来似乎是联表 join 的时候要求定义主键,但是定义主键后会转换为 upsert 流,而 ADB 中定义了
DISTRIBUTE BY 与 upsert 冲突了,不知道是否这么理解
--
Sent from: http://apache-flink.147419.n8.nabble.com/
详细的异常打印信息如下:
java.sql.BatchUpdateException: [3,
2021060816420017201616500303151172306] cannot update pk column UID to expr
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
基础场景: 从 KafkaSource 输入数据,输出到 sinktable, 期间 Left join 关联 DimTable 维表。
Flink 版本 1.12.2
场景1:当把 sinktable 设置为 'connector' = 'print' ,不设置任何主键,可正常关联输出
场景2:当把 sinktable 设置为 'connector' = 'mysql' 则会要求加上 primary key
场景3:在 sinktable 加上 PRIMARY KEY (uid, platform,root_game_id) NOT ENFORCED
则报错,主要报错信息:
你好,可以麻烦详细描述一下吗? 谢谢
--
Sent from: http://apache-flink.147419.n8.nabble.com/
rity);
collect(row);
具体使用可参考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/
Best,Weixubin
--
Sent from: http://apache-flink.147419.n8.nabble.com/
感谢各位的建议
--
Sent from: http://apache-flink.147419.n8.nabble.com/
我想实现将MySQL中的 A 表数据预先查询出来进行缓存,用于给流表 B 进行 join关联。接下来定时查询并更新 A 表内的缓存数据,请问目前
FlinkSQL 可以实现吗?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
source 端接收到的数据类型为 String, sink 端 MySQL 数据库字段类型定义为 decimal(12, 2) , 在编写
insert 语句时该如何把 String 类型转换为 decimal, 尝试了 flink build-in 的 cast
并不行,请问各位有什么好的方法?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi,我这有一个使用Datastream开发简单例子,接收Kafka(Event
Time)数据并进行开窗聚合。Kafka数据格式如:{"word":"a","count":1,"time":1604286564},可以看看该Demo对你是否有所帮助。
public class MyExample {
public static void main(String[] args) throws Exception {
// 创建环境
StreamExecutionEnvironment env =
Hi,
你可以看下这个 Jira , https://issues.apache.org/jira/browse/FLINK-16681,似乎与你问题相关。
由于长时间没有数据导致 connection 断开问题,该问题已经在1.11版本修复。
Best,
Weixubin
--
Sent from: http://apache-flink.147419.n8.nabble.com/
感谢
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi,我在Flink1.11版本,使用filesystem connector的时候,读取csv文件并输出到另外一个csv文件遇到了些问题,问题如下:
问题1:sink 的path指定具体输出文件名,但是输出的结果是 文件夹形式
问题2:在flink1.11的文档中没有找到csv的 ignore-first-line 忽略第一行这个配置
测试数据
11101322000220200517145507667060666706;9
11101412000220200515163257249700624970;9
11101412010220200514163709315410631541;9
Hi,
你可以尝试改写url,加上rewritebatchedstatements=true,如下:
jdbc:mysql://198.2.2.71:3306/bda?useSSL=false=true
MySQL
Jdbc驱动在默认情况下会无视executeBatch()语句,把期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,直接造成较低的性能。把rewriteBatchedStatements参数置为true,
驱动才会帮你批量执行SQL。
祝好
weixubin
--
Sent from: http://apache-flink.147419.n8
感谢
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi,
我想请教下,使用streamExecutionEnv.execute("from kafka sink hbase") 是可以指定Job的名称。
而当改用streamTableEnv.executeSql(sql)的方式时,似乎无法定义Job的名称。
请问有什么解决的方法吗?
在 2020-07-08 16:07:17,"Jingsong Li" 写道:
>Hi,
>
>你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。
>
>所以你后面
Hi,
我想请问下使用 streamExecutionEnv.execute("from kafka sink
hbase"),通过这种方式可以给Job指定名称。
而当使用streamTableEnv.executeSql(sql)之后似乎无法给Job定义名称。
请问有什么解决方案吗?谢谢
--
Sent from: http://apache-flink.147419.n8.nabble.com/
equest_uri)) as T(...)
>) t
>group by TUMBLE(ts, INTERVAL '30' SECOND)
>
>祝好,
>Leonard
>
>
>> 在 2020年6月24日,12:09,Weixubin <18925434...@163.com> 写道:
>>
>> 第一个数据处理过程就是将 request_uri 解析为多个属性(多列),存成一行,作为一条记录。
>
L TABLE(ParseUriRow(request_uri)) as T( )….`
>的这段sql是可以复用的,就和 VIEW的作用类似。
>
>如果你不需要中间结果表,只是要最终的结果表,那你写个嵌套的sql就行了,里层是`select * from sourceTable , LATERAL
>TABLE(ParseUriRow(request_uri)) as T( )….·,外层是 group by, 插入最终的结果表就能满足需求了吧。
>
>祝好,
>Leonard Xu
>
>
>>
Hi,
关于这句 “把 ` select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri))
as T( )….` 这段 sql insert 到中间结果表 和 group后再写入最终结果表就可以了”
我不太明白所说的中间结果表是什么意思, 我所理解为在数据库创建多一张middleSinkTable表,作为中间结果表。请问这样理解是否有误? 可否简单举个例子。
Thanks,
Bin
在 2020-06-23 11:57:28,"Leonard Xu" 写道:
>Hi,
>是的,这个是在
感谢,我查阅了下资料后发现CREATE VIEW这个语法是在Flink.1.12有提及而1.10版本没有 ,1.12版本暂未发行,
而我目前使用的版本是1.10版本。
而且很奇怪,我并没有找到1.11版本的文档
在 2020-06-23 10:58:25,"Leonard Xu" 写道:
>Hi,
>
>> 在 2020年6月23日,10:49,Weixubin <18925434...@163.com> 写道:
>>
>> //这个时候我希望能够创建一张临时中间表 t
Hi,
我希望通过FlinkSQL的方式在一个Job中完成两步的操作,但似乎办不到,情况大致如下:
eg.有一个ETL过程,需要从Source获取数据--将每条数据拆分为一条多列数据--对拆分完的数据开窗聚合--输出到sink。
//从Source获取数据
CREATE TABLE sourceTable (
request_uri STRING
) WITH (
..
);
//这个时候我希望能够创建一张临时中间表 tempTable用来存放 对Source表中数据拆分为多列后的结果,类似于下面这样(Flink 并不支持这么做)
CREATE
和版本应该没什么关系。如果是多节点部署的情况下,-C 所指定的URL 需要各个节点都能访问得到。 确认下该URL能被所有节点访问到吗
Best,
Bin
At 2020-06-22 11:43:11, "程龙" <13162790...@163.com> wrote:
>2020-06-22 10:16:34,379 INFO
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed
>(6/6)
,
Types.STRING
);
}
}
在 2020-06-19 15:46:42,"Jark Wu" 写道:
>用 Row 啊,支持 Int.MAX 个元素,还能支持 null 值,不香么?
>
>
>On Fri, 19 Jun 2020 at 15:42, Weixubin <18925434...@163.com> wrote:
>
>> 感谢你的回答,请问可否举一个参照例子?
>>
>>
>>
感谢你的回答,请问可否举一个参照例子?
在 2020-06-19 15:31:53,"wangweigu...@stevegame.cn"
写道:
>
> 多个值组合在一起,当一个复合值使用!
>
>
>
>
>发件人: 魏旭斌
>发送时间: 2020-06-19 15:01
>收件人: user-zh
>主题: 关于拓展 Tuple元组的问题
>目前Flink 提供了Tuple1 ~ Tuple25,在实际开发中不能满足我们的需求,我们希望能有更大的元组,比如构建一个Tuple50。
>请问有什么解决的方案? 谢谢
29 matches
Mail list logo